This is an automated email from the ASF dual-hosted git repository.

codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5e1ebe7fb91 [HUDI-8452] Init partition stats based on latest file 
slices only (#12174)
5e1ebe7fb91 is described below

commit 5e1ebe7fb9167bac5534c2da719341de820320b1
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Nov 13 15:45:40 2024 +0530

    [HUDI-8452] Init partition stats based on latest file slices only (#12174)
    
    Initialize partition stats based on latest file slices only instead of all 
files in a partition.
---
 .../metadata/HoodieBackedTableMetadataWriter.java  |   6 +-
 .../hudi/common/config/HoodieMetadataConfig.java   |  18 ---
 .../hudi/metadata/HoodieTableMetadataUtil.java     |  57 ++++----
 .../hudi/metadata/TestHoodieTableMetadataUtil.java |  22 ++-
 .../TestPartitionStatsIndexWithSql.scala           | 151 ++++++++++-----------
 .../TestHoodieMetadataTableValidator.java          |   3 -
 6 files changed, 109 insertions(+), 148 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 0e8d5339cd2..bf930ac9fce 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -207,7 +207,7 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
   }
 
   private HoodieMetadataFileSystemView getMetadataView() {
-    if (metadataView == null) {
+    if (metadataView == null || 
!metadataView.equals(metadata.getMetadataFileSystemView())) {
       ValidationUtils.checkState(metadata != null, "Metadata table not 
initialized");
       ValidationUtils.checkState(dataMetaClient != null, "Data table meta 
client not initialized");
       metadataView = new HoodieMetadataFileSystemView(dataMetaClient, 
dataMetaClient.getActiveTimeline(), metadata);
@@ -503,8 +503,8 @@ public abstract class HoodieBackedTableMetadataWriter<I> 
implements HoodieTableM
     }
   }
 
-  private Pair<Integer, HoodieData<HoodieRecord>> 
initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) {
-    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, 
partitionInfoList, dataWriteConfig.getMetadataConfig(), dataMetaClient,
+  private Pair<Integer, HoodieData<HoodieRecord>> 
initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) throws 
IOException {
+    HoodieData<HoodieRecord> records = 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext, 
getPartitionFileSlicePairs(), dataWriteConfig.getMetadataConfig(), 
dataMetaClient,
         Option.of(new 
Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
     final int fileGroupCount = 
dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
     return Pair.of(fileGroupCount, records);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 332c26f6908..736b21e847a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -358,15 +358,6 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("1.0.0")
       .withDocumentation("Parallelism to use, when generating partition stats 
index.");
 
-  public static final ConfigProperty<Boolean> 
PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE = ConfigProperty
-      .key(METADATA_PREFIX + 
".index.partition.stats.consolidate.on.every.write")
-      .defaultValue(false)
-      .sinceVersion("1.0.0")
-      .withDocumentation("When enabled, partition stats is consolidated is 
computed on every commit for the min/max value of every column "
-          + "at the storage partition level. Typically, the min/max range for 
each column can become wider (i.e. the minValue is <= all valid values "
-          + "in the file, and the maxValue >= all valid values in the file) 
with updates and deletes. If this config is enabled, "
-          + "the min/max range will be updated to the tight bound of the valid 
values after every commit for the partitions touched.");
-
   public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP = 
ConfigProperty
       .key(METADATA_PREFIX + ".index.secondary.enable")
       .defaultValue(false)
@@ -534,10 +525,6 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getInt(PARTITION_STATS_INDEX_PARALLELISM);
   }
 
-  public boolean isPartitionStatsIndexConsolidationEnabledOnEveryWrite() {
-    return 
getBooleanOrDefault(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE);
-  }
-
   public boolean isSecondaryIndexEnabled() {
     // Secondary index is enabled only iff record index (primary key index) is 
also enabled
     return isRecordIndexEnabled() && getBoolean(SECONDARY_INDEX_ENABLE_PROP);
@@ -749,11 +736,6 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
-    public Builder 
withPartitionStatsIndexConsolidationEnabledOnEveryWrite(boolean enable) {
-      
metadataConfig.setValue(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE, 
String.valueOf(enable));
-      return this;
-    }
-
     public HoodieMetadataConfig build() {
       metadataConfig.setDefaultValue(ENABLE, 
getDefaultMetadataEnable(engineType));
       metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index bcd54931d88..994ec6708bd 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -61,7 +61,6 @@ import 
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
 import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
 import org.apache.hudi.common.model.HoodieRecordMerger;
 import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.TableSchemaResolver;
@@ -2159,7 +2158,7 @@ public class HoodieTableMetadataUtil {
   }
 
   public static HoodieData<HoodieRecord> 
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
-                                                                             
List<DirectoryInfo> partitionInfoList,
+                                                                             
List<Pair<String, FileSlice>> partitionInfoList,
                                                                              
HoodieMetadataConfig metadataConfig,
                                                                              
HoodieTableMetaClient dataTableMetaClient,
                                                                              
Option<Schema> writerSchemaOpt) {
@@ -2173,12 +2172,22 @@ public class HoodieTableMetadataUtil {
       return engineContext.emptyHoodieData();
     }
     LOG.debug("Indexing following columns for partition stats index: {}", 
columnsToIndex);
+
+    // Group by partition path and collect file names (BaseFile and LogFiles)
+    List<Pair<String, Set<String>>> partitionToFileNames = 
partitionInfoList.stream()
+        .collect(Collectors.groupingBy(Pair::getLeft,
+            Collectors.mapping(pair -> extractFileNames(pair.getRight()), 
Collectors.toList())))
+        .entrySet().stream()
+        .map(entry -> Pair.of(entry.getKey(),
+            
entry.getValue().stream().flatMap(Set::stream).collect(Collectors.toSet())))
+        .collect(Collectors.toList());
+
     // Create records for MDT
-    int parallelism = Math.max(Math.min(partitionInfoList.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
-    return engineContext.parallelize(partitionInfoList, 
parallelism).flatMap(partitionInfo -> {
-      final String partitionPath = partitionInfo.getRelativePath();
+    int parallelism = Math.max(Math.min(partitionToFileNames.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+    return engineContext.parallelize(partitionToFileNames, 
parallelism).flatMap(partitionInfo -> {
+      final String partitionPath = partitionInfo.getKey();
       // Step 1: Collect Column Metadata for Each File
-      List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionInfo.getFileNameToSizeMap().keySet().stream()
+      List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata = 
partitionInfo.getValue().stream()
           .map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName, 
dataTableMetaClient, columnsToIndex, false,
               metadataConfig.getMaxReaderBufferSize()))
           .collect(Collectors.toList());
@@ -2187,6 +2196,14 @@ public class HoodieTableMetadataUtil {
     });
   }
 
+  private static Set<String> extractFileNames(FileSlice fileSlice) {
+    Set<String> fileNames = new HashSet<>();
+    Option<HoodieBaseFile> baseFile = fileSlice.getBaseFile();
+    baseFile.ifPresent(hoodieBaseFile -> 
fileNames.add(hoodieBaseFile.getFileName()));
+    fileSlice.getLogFiles().forEach(hoodieLogFile -> 
fileNames.add(hoodieLogFile.getFileName()));
+    return fileNames;
+  }
+
   private static List<HoodieColumnRangeMetadata<Comparable>> 
getFileStatsRangeMetadata(String partitionPath,
                                                                                
        String fileName,
                                                                                
        HoodieTableMetaClient datasetMetaClient,
@@ -2234,8 +2251,7 @@ public class HoodieTableMetadataUtil {
           .collect(Collectors.toList());
 
       int parallelism = Math.max(Math.min(partitionedWriteStats.size(), 
metadataConfig.getPartitionStatsIndexParallelism()), 1);
-      boolean shouldScanColStatsForTightBound = 
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
-          && 
(metadataConfig.isPartitionStatsIndexConsolidationEnabledOnEveryWrite() || 
shouldConsolidatePartitionStats(commitMetadata, dataMetaClient));
+      boolean shouldScanColStatsForTightBound = 
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient);
       HoodieTableMetadata tableMetadata;
       if (shouldScanColStatsForTightBound) {
         tableMetadata = HoodieTableMetadata.create(engineContext, 
dataMetaClient.getStorage(), metadataConfig, 
dataMetaClient.getBasePath().toString());
@@ -2280,31 +2296,6 @@ public class HoodieTableMetadataUtil {
     }
   }
 
-  private static boolean shouldConsolidatePartitionStats(HoodieCommitMetadata 
commitMetadata, HoodieTableMetaClient metaClient) {
-    if 
(WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType()))
 {
-      return true;
-    }
-    // if not for compaction or clustering, lets check for any new base file 
added.
-    HoodieTableFileSystemView fsv = getFileSystemView(metaClient);
-    try {
-      fsv.loadPartitions(new 
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()));
-      // Collect already existing filegroupIds.
-      Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
-      commitMetadata.getPartitionToWriteStats().keySet().forEach(partition -> {
-        partitionToExistingFileIds.put(partition, 
fsv.getLatestBaseFiles(partition).map(baseFile -> 
baseFile.getFileId()).collect(Collectors.toList()));
-      });
-      // check if new base file is added to any of existing file groups.
-      // as of now, if any one partition has a new base file added, we are 
triggering tighter bound computation for all partitions touched in this commit.
-      return 
commitMetadata.getPartitionToWriteStats().entrySet().stream().filter(entry -> {
-        List<String> existingFileIds = 
partitionToExistingFileIds.get(entry.getKey());
-        return entry.getValue().stream().filter(writeStat -> 
writeStat.getPath().contains(".parquet"))
-            .filter(writeStat -> 
existingFileIds.contains(writeStat.getFileId())).findAny().isPresent();
-      }).findAny().isPresent();
-    } finally {
-      fsv.close();
-    }
-  }
-
   /**
    * Generate key prefixes for each combination of column name in {@param 
columnsToIndex} and {@param partitionName}.
    */
diff --git 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 9586171d97a..32e0d2252a9 100644
--- 
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++ 
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -103,7 +103,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     hoodieTestTable = hoodieTestTable.addCommit(instant1);
     String instant2 = "20230918121110000";
     hoodieTestTable = hoodieTestTable.addCommit(instant2);
-    List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = new 
ArrayList<>();
+    List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
     // Generate 10 inserts for each partition and populate 
partitionBaseFilePairs and recordKeys.
     DATE_PARTITIONS.forEach(p -> {
       try {
@@ -131,11 +131,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
             engineContext);
         HoodieBaseFile baseFile2 = new 
HoodieBaseFile(hoodieTestTable.getBaseFilePath(p, fileId2).toString());
         fileSlice2.setBaseFile(baseFile2);
-        partitionInfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(
-            p,
-            
metaClient.getStorage().listDirectEntries(Arrays.asList(partitionMetadataPath, 
storagePath1, storagePath2)),
-            instant2,
-            Collections.emptySet()));
+        partitionFileSlicePairs.add(Pair.of(p, fileSlice1));
+        partitionFileSlicePairs.add(Pair.of(p, fileSlice2));
       } catch (Exception e) {
         throw new RuntimeException(e);
       }
@@ -144,7 +141,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     List<String> columnsToIndex = Arrays.asList("rider", "driver");
     HoodieData<HoodieRecord> result = 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
         engineContext,
-        partitionInfoList,
+        partitionFileSlicePairs,
         HoodieMetadataConfig.newBuilder().enable(true)
             .withMetadataIndexColumnStats(true)
             .withMetadataIndexPartitionStats(true)
@@ -216,7 +213,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     hoodieTestTable = hoodieTestTable.addCommit(instant1, 
Option.of(commitMetadata));
     String instant2 = "20230918121110000";
     hoodieTestTable = hoodieTestTable.addCommit(instant2);
-    List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = new 
ArrayList<>();
+    List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
     List<String> columnsToIndex = Arrays.asList("rider", "driver");
     // Generate 10 inserts for each partition and populate 
partitionBaseFilePairs and recordKeys.
     DATE_PARTITIONS.forEach(p -> {
@@ -237,11 +234,8 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
         writeLogFiles(new StoragePath(metaClient.getBasePath(), p), 
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS, 
dataGen.generateInsertsForPartition(instant2, 10, p), 1,
             metaClient.getStorage(), new Properties(), fileId1, instant2);
         fileSlice2.addLogFile(new 
HoodieLogFile(storagePath2.toUri().toString()));
-        partitionInfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(
-            p,
-            
metaClient.getStorage().listDirectEntries(Arrays.asList(partitionMetadataPath, 
storagePath1, storagePath2)),
-            instant2,
-            Collections.emptySet()));
+        partitionFileSlicePairs.add(Pair.of(p, fileSlice1));
+        partitionFileSlicePairs.add(Pair.of(p, fileSlice2));
         // NOTE: we need to set table config as we are not using write client 
explicitly and these configs are needed for log record reader
         
metaClient.getTableConfig().setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(),
 "false");
         
metaClient.getTableConfig().setValue(HoodieTableConfig.RECORDKEY_FIELDS.key(), 
"_row_key");
@@ -261,7 +255,7 @@ public class TestHoodieTableMetadataUtil extends 
HoodieCommonTestHarness {
     // collect partition stats, this will collect stats for log files as well
     HoodieData<HoodieRecord> result = 
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
         engineContext,
-        partitionInfoList,
+        partitionFileSlicePairs,
         HoodieMetadataConfig.newBuilder().enable(true)
             .withMetadataIndexColumnStats(true)
             .withMetadataIndexPartitionStats(true)
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 05b480c3542..f585b720fcd 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -285,88 +285,85 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
    * 2. Update to widen the bounds: Perform updates to increase the price 
values, causing the min/max stats to widen.
    * 3. Update to remove or lower the max value: Delete or update the record 
that holds the max value to simulate a
    * scenario where the bounds need to be tightened.
-   * 4. Trigger stats recomputation: Assuming the config for tighter bounds is 
enabled, validate that the partition stats
+   * 4. Trigger stats recomputation: Assuming tighter bounds is enabled on 
every write, validate that the partition stats
    * have adjusted correctly after scanning and recomputing accurate stats.
    */
   test("Test partition stats index with tight bound") {
     Seq("cow", "mor").foreach { tableType =>
-      Seq("true", "false").foreach { 
isPartitionStatsIndexConsolidationEnabledOnEveryWrite =>
-        withTempDir { tmp =>
-          val tableName = generateTableName + 
s"_tight_bound_$isPartitionStatsIndexConsolidationEnabledOnEveryWrite"
-          val tablePath = s"${tmp.getCanonicalPath}/$tableName"
-          spark.sql(
-            s"""
-               |create table $tableName (
-               |  id int,
-               |  name string,
-               |  price int,
-               |  ts long
-               |) using hudi
-               |partitioned by (ts)
-               |tblproperties (
-               |  type = '$tableType',
-               |  primaryKey = 'id',
-               |  preCombineField = 'price',
-               |  hoodie.metadata.index.partition.stats.enable = 'true',
-               |  
hoodie.metadata.index.partition.stats.consolidate.on.every.write = 
'$isPartitionStatsIndexConsolidationEnabledOnEveryWrite',
-               |  hoodie.metadata.index.column.stats.enable = 'true',
-               |  hoodie.metadata.index.column.stats.column.list = 'price'
-               |)
-               |location '$tablePath'
-               |""".stripMargin
-          )
+      withTempDir { tmp =>
+        val tableName = generateTableName + s"_tight_bound_$tableType"
+        val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+        spark.sql(
+          s"""
+             |create table $tableName (
+             |  id int,
+             |  name string,
+             |  price int,
+             |  ts long
+             |) using hudi
+             |partitioned by (ts)
+             |tblproperties (
+             |  type = '$tableType',
+             |  primaryKey = 'id',
+             |  preCombineField = 'price',
+             |  hoodie.metadata.index.partition.stats.enable = 'true',
+             |  hoodie.metadata.index.column.stats.enable = 'true',
+             |  hoodie.metadata.index.column.stats.column.list = 'price'
+             |)
+             |location '$tablePath'
+             |""".stripMargin
+        )
 
-          /**
-           * Insert: Insert values for price across multiple partitions 
(ts=10, ts=20, ts=30):
-           *
-           * Partition ts=10: price = 1000, 1500
-           * Partition ts=20: price = 2000, 2500
-           * Partition ts=30: price = 3000, 3500
-           *
-           * This will initialize the partition stats with bounds like [1000, 
1500], [2000, 2500], and [3000, 3500].
-           */
-          spark.sql(s"insert into $tableName values (1, 'a1', 1000, 10), (2, 
'a2', 1500, 10)")
-          spark.sql(s"insert into $tableName values (3, 'a3', 2000, 20), (4, 
'a4', 2500, 20)")
-          spark.sql(s"insert into $tableName values (5, 'a5', 3000, 30), (6, 
'a6', 3500, 30)")
-
-          // validate partition stats initialization
-          checkAnswer(s"select key, 
ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName') 
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
-            Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500),
-            Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500),
-            Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3500)
-          )
+        /**
+         * Insert: Insert values for price across multiple partitions (ts=10, 
ts=20, ts=30):
+         *
+         * Partition ts=10: price = 1000, 1500
+         * Partition ts=20: price = 2000, 2500
+         * Partition ts=30: price = 3000, 3500
+         *
+         * This will initialize the partition stats with bounds like [1000, 
1500], [2000, 2500], and [3000, 3500].
+         */
+        spark.sql(s"insert into $tableName values (1, 'a1', 1000, 10), (2, 
'a2', 1500, 10)")
+        spark.sql(s"insert into $tableName values (3, 'a3', 2000, 20), (4, 
'a4', 2500, 20)")
+        spark.sql(s"insert into $tableName values (5, 'a5', 3000, 30), (6, 
'a6', 3500, 30)")
+
+        // validate partition stats initialization
+        checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName') 
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
+          Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500),
+          Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500),
+          Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3500)
+        )
 
-          // First Update (widen the bounds): Update the price in partition 
ts=30, where price = 4000 for id=6.
-          //                                  This will widen the max bounds 
in ts=30 from 3500 to 4000.
-          spark.sql(s"update $tableName set price = 4000 where id = 6")
-          // Validate widened stats
-          checkAnswer(s"select key, 
ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
-            Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
-            Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
-            // for COW table, that stats are consolidated on every write as 
there is a new slice for the same filegroup
-            Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4000, 
isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean || tableType == 
"cow")
-          )
-          // verify file pruning
-          var metaClient = HoodieTableMetaClient.builder()
-            .setBasePath(tablePath)
-            .setConf(HoodieTestUtils.getDefaultStorageConf)
-            .build()
-          
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
-            GreaterThan(AttributeReference("price", IntegerType)(), 
Literal(3000)), metaClient, isDataSkippingExpected = true)
-
-          // Second update (reduce max value)
-          spark.sql(s"delete from $tableName where id = 6")
-          // Validate that stats have recomputed and tightened
-          checkAnswer(s"select key, 
ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
-            Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
-            Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
-            Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3000, true) 
// tighter bound, note that record with prev max was deleted
-          )
-          // verify file pruning
-          metaClient = HoodieTableMetaClient.reload(metaClient)
-          
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
-            GreaterThan(AttributeReference("price", IntegerType)(), 
Literal(3000)), metaClient, isDataSkippingExpected = true, isNoScanExpected = 
true)
-        }
+        // First Update (widen the bounds): Update the price in partition 
ts=30, where price = 4000 for id=6.
+        //                                  This will widen the max bounds in 
ts=30 from 3500 to 4000.
+        spark.sql(s"update $tableName set price = 4000 where id = 6")
+        // Validate widened stats
+        checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
+          Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
+          Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
+          Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4000, true)
+        )
+        // verify file pruning
+        var metaClient = HoodieTableMetaClient.builder()
+          .setBasePath(tablePath)
+          .setConf(HoodieTestUtils.getDefaultStorageConf)
+          .build()
+        
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
+          GreaterThan(AttributeReference("price", IntegerType)(), 
Literal(3000)), metaClient, isDataSkippingExpected = true)
+
+        // Second update (reduce max value)
+        spark.sql(s"delete from $tableName where id = 6")
+        // Validate that stats have recomputed and tightened
+        // if tighter bound, note that record with prev max was deleted
+        checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
+          Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
+          Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
+          Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3000, true)
+        )
+        // verify file pruning
+        metaClient = HoodieTableMetaClient.reload(metaClient)
+        
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key -> 
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
+          GreaterThan(AttributeReference("price", IntegerType)(), 
Literal(3000)), metaClient, isDataSkippingExpected = true, isNoScanExpected = 
true)
       }
     }
   }
@@ -447,7 +444,7 @@ class TestPartitionStatsIndexWithSql extends 
HoodieSparkSqlTestBase {
       checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value, 
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound 
from hudi_metadata('$tableName') where 
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and 
ColumnStatsMetadata.columnName='price'")(
         Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, true),
         Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, true),
-        Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4003, false)
+        Seq(getPartitionStatsIndexKey("ts=30", "price"), 4003, 4003, true)
       )
     }
   }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index a232ca32324..488207adf8c 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -407,9 +407,6 @@ public class TestHoodieMetadataTableValidator extends 
HoodieSparkClientTestBase
     writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
     writeOptions.put("hoodie.table.name", "test_table");
     writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType);
-    if (tableType.equals("COPY_ON_WRITE")) {
-      
writeOptions.put(HoodieMetadataConfig.PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE.key(),
 "true");
-    }
     writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(), 
"_row_key");
     writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(), 
"timestamp");
     writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(), 
"partition_path");

Reply via email to