This is an automated email from the ASF dual-hosted git repository.
sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 070c600d79c [HUDI-8348] Whenever any new file slice is added, do the
full computation of stats for the partitions touched (#12121)
070c600d79c is described below
commit 070c600d79ce5714c8787d423d913615add60fcc
Author: Lokesh Jain <[email protected]>
AuthorDate: Sat Oct 19 20:13:22 2024 +0530
[HUDI-8348] Whenever any new file slice is added, do the full computation
of stats for the partitions touched (#12121)
* [HUDI-8348] Whenever any new file slice is added, do the full computation
of stats for the partitions touched
---------
Co-authored-by: sivabalan <[email protected]>
Co-authored-by: Sagar Sumit <[email protected]>
---
.../hudi/common/config/HoodieMetadataConfig.java | 20 ++--
.../hudi/metadata/HoodieTableMetadataUtil.java | 29 +++++-
.../TestPartitionStatsIndexWithSql.scala | 101 ++++++++++++++++++++-
.../utilities/HoodieMetadataTableValidator.java | 4 +-
.../TestHoodieMetadataTableValidator.java | 10 +-
5 files changed, 144 insertions(+), 20 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index c84723db8ed..332c26f6908 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -358,14 +358,14 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating partition stats
index.");
- public static final ConfigProperty<Boolean>
ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND = ConfigProperty
- .key(METADATA_PREFIX + ".index.partition.stats.tightBound.enable")
- .defaultValue(true)
+ public static final ConfigProperty<Boolean>
PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE = ConfigProperty
+ .key(METADATA_PREFIX +
".index.partition.stats.consolidate.on.every.write")
+ .defaultValue(false)
.sinceVersion("1.0.0")
- .withDocumentation("Enable tight bound for the min/max value for each
column at the storage partition level. "
- + "Typically, the min/max range for each column can become wider
(i.e. the minValue is <= all valid values "
+ .withDocumentation("When enabled, partition stats is consolidated is
computed on every commit for the min/max value of every column "
+ + "at the storage partition level. Typically, the min/max range for
each column can become wider (i.e. the minValue is <= all valid values "
+ "in the file, and the maxValue >= all valid values in the file)
with updates and deletes. If this config is enabled, "
- + "the min/max range will be updated to the tight bound of the valid
values in the latest snapshot of the partition.");
+ + "the min/max range will be updated to the tight bound of the valid
values after every commit for the partitions touched.");
public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP =
ConfigProperty
.key(METADATA_PREFIX + ".index.secondary.enable")
@@ -534,8 +534,8 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getInt(PARTITION_STATS_INDEX_PARALLELISM);
}
- public boolean isPartitionStatsIndexTightBoundEnabled() {
- return getBooleanOrDefault(ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND);
+ public boolean isPartitionStatsIndexConsolidationEnabledOnEveryWrite() {
+ return
getBooleanOrDefault(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE);
}
public boolean isSecondaryIndexEnabled() {
@@ -749,8 +749,8 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return this;
}
- public Builder withPartitionStatsIndexTightBound(boolean enable) {
- metadataConfig.setValue(ENABLE_PARTITION_STATS_INDEX_TIGHT_BOUND,
String.valueOf(enable));
+ public Builder
withPartitionStatsIndexConsolidationEnabledOnEveryWrite(boolean enable) {
+
metadataConfig.setValue(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE,
String.valueOf(enable));
return this;
}
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index 21d7518669d..68042be8cfd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -2201,7 +2201,7 @@ public class HoodieTableMetadataUtil {
int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
boolean shouldScanColStatsForTightBound =
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
- && metadataConfig.isPartitionStatsIndexTightBoundEnabled() &&
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+ &&
(metadataConfig.isPartitionStatsIndexConsolidationEnabledOnEveryWrite() ||
shouldConsolidatePartitionStats(commitMetadata, dataMetaClient));
HoodieTableMetadata tableMetadata;
if (shouldScanColStatsForTightBound) {
tableMetadata = HoodieTableMetadata.create(engineContext,
dataMetaClient.getStorage(), metadataConfig,
dataMetaClient.getBasePath().toString());
@@ -2218,7 +2218,7 @@ public class HoodieTableMetadataUtil {
checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
// Collect Column Metadata for Each File part of active file system
view of latest snapshot
// Get all file names, including log files, in a set from the file
slices
- Set<String> fileNames = getPartitionFileSlices(dataMetaClient,
Option.empty(), partitionName, true).stream()
+ Set<String> fileNames =
getPartitionLatestFileSlicesIncludingInflight(dataMetaClient, Option.empty(),
partitionName).stream()
.flatMap(fileSlice -> Stream.concat(
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
@@ -2246,6 +2246,31 @@ public class HoodieTableMetadataUtil {
}
}
+ private static boolean shouldConsolidatePartitionStats(HoodieCommitMetadata
commitMetadata, HoodieTableMetaClient metaClient) {
+ if
(WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType()))
{
+ return true;
+ }
+ // if not for compaction or clustering, lets check for any new base file
added.
+ HoodieTableFileSystemView fsv = getFileSystemView(metaClient);
+ try {
+ fsv.loadPartitions(new
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()));
+ // Collect already existing filegroupIds.
+ Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
+ commitMetadata.getPartitionToWriteStats().keySet().forEach(partition -> {
+ partitionToExistingFileIds.put(partition,
fsv.getLatestBaseFiles(partition).map(baseFile ->
baseFile.getFileId()).collect(Collectors.toList()));
+ });
+ // check if new base file is added to any of existing file groups.
+ // as of now, if any one partition has a new base file added, we are
triggering tighter bound computation for all partitions touched in this commit.
+ return
commitMetadata.getPartitionToWriteStats().entrySet().stream().filter(entry -> {
+ List<String> existingFileIds =
partitionToExistingFileIds.get(entry.getKey());
+ return entry.getValue().stream().filter(writeStat ->
writeStat.getPath().contains(".parquet"))
+ .filter(writeStat ->
existingFileIds.contains(writeStat.getFileId())).findAny().isPresent();
+ }).findAny().isPresent();
+ } finally {
+ fsv.close();
+ }
+ }
+
/**
* Generate key prefixes for each combination of column name in {@param
columnsToIndex} and {@param partitionName}.
*/
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 338600b8204..05b480c3542 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -278,6 +278,99 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
}
}
+ /**
+ * Test the partition stats consolidation:
+ *
+ * 1. Insert values: Initially insert records with values that create wide
bounds for the price field across different partitions.
+ * 2. Update to widen the bounds: Perform updates to increase the price
values, causing the min/max stats to widen.
+ * 3. Update to remove or lower the max value: Delete or update the record
that holds the max value to simulate a
+ * scenario where the bounds need to be tightened.
+ * 4. Trigger stats recomputation: Assuming the config for tighter bounds is
enabled, validate that the partition stats
+ * have adjusted correctly after scanning and recomputing accurate stats.
+ */
+ test("Test partition stats index with tight bound") {
+ Seq("cow", "mor").foreach { tableType =>
+ Seq("true", "false").foreach {
isPartitionStatsIndexConsolidationEnabledOnEveryWrite =>
+ withTempDir { tmp =>
+ val tableName = generateTableName +
s"_tight_bound_$isPartitionStatsIndexConsolidationEnabledOnEveryWrite"
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price int,
+ | ts long
+ |) using hudi
+ |partitioned by (ts)
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'price',
+ | hoodie.metadata.index.partition.stats.enable = 'true',
+ |
hoodie.metadata.index.partition.stats.consolidate.on.every.write =
'$isPartitionStatsIndexConsolidationEnabledOnEveryWrite',
+ | hoodie.metadata.index.column.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.column.list = 'price'
+ |)
+ |location '$tablePath'
+ |""".stripMargin
+ )
+
+ /**
+ * Insert: Insert values for price across multiple partitions
(ts=10, ts=20, ts=30):
+ *
+ * Partition ts=10: price = 1000, 1500
+ * Partition ts=20: price = 2000, 2500
+ * Partition ts=30: price = 3000, 3500
+ *
+ * This will initialize the partition stats with bounds like [1000,
1500], [2000, 2500], and [3000, 3500].
+ */
+ spark.sql(s"insert into $tableName values (1, 'a1', 1000, 10), (2,
'a2', 1500, 10)")
+ spark.sql(s"insert into $tableName values (3, 'a3', 2000, 20), (4,
'a4', 2500, 20)")
+ spark.sql(s"insert into $tableName values (5, 'a5', 3000, 30), (6,
'a6', 3500, 30)")
+
+ // validate partition stats initialization
+ checkAnswer(s"select key,
ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName')
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500),
+ Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3500)
+ )
+
+ // First Update (widen the bounds): Update the price in partition
ts=30, where price = 4000 for id=6.
+ // This will widen the max bounds
in ts=30 from 3500 to 4000.
+ spark.sql(s"update $tableName set price = 4000 where id = 6")
+ // Validate widened stats
+ checkAnswer(s"select key,
ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
+ // for COW table, that stats are consolidated on every write as
there is a new slice for the same filegroup
+ Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4000,
isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean || tableType ==
"cow")
+ )
+ // verify file pruning
+ var metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("price", IntegerType)(),
Literal(3000)), metaClient, isDataSkippingExpected = true)
+
+ // Second update (reduce max value)
+ spark.sql(s"delete from $tableName where id = 6")
+ // Validate that stats have recomputed and tightened
+ checkAnswer(s"select key,
ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
+ Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3000, true)
// tighter bound, note that record with prev max was deleted
+ )
+ // verify file pruning
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("price", IntegerType)(),
Literal(3000)), metaClient, isDataSkippingExpected = true, isNoScanExpected =
true)
+ }
+ }
+ }
+ }
+
/**
* 1. Create MOR table with compaction enabled.
* 2. Do an insert and validate the partition stats index initialization.
@@ -352,8 +445,8 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
// validate partition stats index
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
- Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, false),
- Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, false),
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, true),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, true),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4003, false)
)
}
@@ -398,8 +491,8 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
if (tableType == "mor" && shouldCompact) {
// check partition stats records with tightBound
checkAnswer(s"select key,
ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
- Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000,
false),
- Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000,
false),
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000,
true),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000,
true),
Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4001,
true)
)
}
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
index 4cee0815e46..5459e3359a2 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/HoodieMetadataTableValidator.java
@@ -980,7 +980,9 @@ public class HoodieMetadataTableValidator implements
Serializable {
AvroConversionUtils.convertAvroSchemaToStructType(metadataTableBasedContext.getSchema()),
metadataTableBasedContext.getMetadataConfig(),
metaClient, false);
HoodieData<HoodieMetadataColumnStats> partitionStats =
-
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
scala.Option.empty(), false);
+
partitionStatsIndexSupport.loadColumnStatsIndexRecords(JavaConverters.asScalaBufferConverter(metadataTableBasedContext.allColumnNameList).asScala().toSeq(),
scala.Option.empty(), false)
+ // set isTightBound to false since partition stats generated using
column stats does not contain the field
+ .map(colStat ->
HoodieMetadataColumnStats.newBuilder(colStat).setIsTightBound(false).build());
JavaRDD<HoodieMetadataColumnStats> diffRDD =
HoodieJavaRDD.getJavaRDD(partitionStats).subtract(HoodieJavaRDD.getJavaRDD(partitionStatsUsingColStats));
if (!diffRDD.isEmpty()) {
List<HoodieMetadataColumnStats> diff = diffRDD.collect();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index e0b0255a6f8..9fad1385400 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -288,13 +288,17 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
return rows;
}
- @Test
- public void testPartitionStatsValidation() {
+ @ParameterizedTest
+ @ValueSource(strings = {"MERGE_ON_READ", "COPY_ON_WRITE"})
+ public void testPartitionStatsValidation(String tableType) {
// TODO: Add validation for compaction and clustering cases
Map<String, String> writeOptions = new HashMap<>();
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
writeOptions.put("hoodie.table.name", "test_table");
- writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(),
"MERGE_ON_READ");
+ writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType);
+ if (tableType.equals("COPY_ON_WRITE")) {
+
writeOptions.put(HoodieMetadataConfig.PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE.key(),
"true");
+ }
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
"partition_path");