This is an automated email from the ASF dual-hosted git repository.
codope pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 5e1ebe7fb91 [HUDI-8452] Init partition stats based on latest file
slices only (#12174)
5e1ebe7fb91 is described below
commit 5e1ebe7fb9167bac5534c2da719341de820320b1
Author: Sagar Sumit <[email protected]>
AuthorDate: Wed Nov 13 15:45:40 2024 +0530
[HUDI-8452] Init partition stats based on latest file slices only (#12174)
Initialize partition stats based on latest file slices only instead of all
files in a partition.
---
.../metadata/HoodieBackedTableMetadataWriter.java | 6 +-
.../hudi/common/config/HoodieMetadataConfig.java | 18 ---
.../hudi/metadata/HoodieTableMetadataUtil.java | 57 ++++----
.../hudi/metadata/TestHoodieTableMetadataUtil.java | 22 ++-
.../TestPartitionStatsIndexWithSql.scala | 151 ++++++++++-----------
.../TestHoodieMetadataTableValidator.java | 3 -
6 files changed, 109 insertions(+), 148 deletions(-)
diff --git
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
index 0e8d5339cd2..bf930ac9fce 100644
---
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
+++
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadataWriter.java
@@ -207,7 +207,7 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
private HoodieMetadataFileSystemView getMetadataView() {
- if (metadataView == null) {
+ if (metadataView == null ||
!metadataView.equals(metadata.getMetadataFileSystemView())) {
ValidationUtils.checkState(metadata != null, "Metadata table not
initialized");
ValidationUtils.checkState(dataMetaClient != null, "Data table meta
client not initialized");
metadataView = new HoodieMetadataFileSystemView(dataMetaClient,
dataMetaClient.getActiveTimeline(), metadata);
@@ -503,8 +503,8 @@ public abstract class HoodieBackedTableMetadataWriter<I>
implements HoodieTableM
}
}
- private Pair<Integer, HoodieData<HoodieRecord>>
initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) {
- HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext,
partitionInfoList, dataWriteConfig.getMetadataConfig(), dataMetaClient,
+ private Pair<Integer, HoodieData<HoodieRecord>>
initializePartitionStatsIndex(List<DirectoryInfo> partitionInfoList) throws
IOException {
+ HoodieData<HoodieRecord> records =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(engineContext,
getPartitionFileSlicePairs(), dataWriteConfig.getMetadataConfig(),
dataMetaClient,
Option.of(new
Schema.Parser().parse(dataWriteConfig.getWriteSchema())));
final int fileGroupCount =
dataWriteConfig.getMetadataConfig().getPartitionStatsIndexFileGroupCount();
return Pair.of(fileGroupCount, records);
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index 332c26f6908..736b21e847a 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -358,15 +358,6 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
.sinceVersion("1.0.0")
.withDocumentation("Parallelism to use, when generating partition stats
index.");
- public static final ConfigProperty<Boolean>
PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE = ConfigProperty
- .key(METADATA_PREFIX +
".index.partition.stats.consolidate.on.every.write")
- .defaultValue(false)
- .sinceVersion("1.0.0")
- .withDocumentation("When enabled, partition stats is consolidated is
computed on every commit for the min/max value of every column "
- + "at the storage partition level. Typically, the min/max range for
each column can become wider (i.e. the minValue is <= all valid values "
- + "in the file, and the maxValue >= all valid values in the file)
with updates and deletes. If this config is enabled, "
- + "the min/max range will be updated to the tight bound of the valid
values after every commit for the partitions touched.");
-
public static final ConfigProperty<Boolean> SECONDARY_INDEX_ENABLE_PROP =
ConfigProperty
.key(METADATA_PREFIX + ".index.secondary.enable")
.defaultValue(false)
@@ -534,10 +525,6 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return getInt(PARTITION_STATS_INDEX_PARALLELISM);
}
- public boolean isPartitionStatsIndexConsolidationEnabledOnEveryWrite() {
- return
getBooleanOrDefault(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE);
- }
-
public boolean isSecondaryIndexEnabled() {
// Secondary index is enabled only iff record index (primary key index) is
also enabled
return isRecordIndexEnabled() && getBoolean(SECONDARY_INDEX_ENABLE_PROP);
@@ -749,11 +736,6 @@ public final class HoodieMetadataConfig extends
HoodieConfig {
return this;
}
- public Builder
withPartitionStatsIndexConsolidationEnabledOnEveryWrite(boolean enable) {
-
metadataConfig.setValue(PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE,
String.valueOf(enable));
- return this;
- }
-
public HoodieMetadataConfig build() {
metadataConfig.setDefaultValue(ENABLE,
getDefaultMetadataEnable(engineType));
metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
diff --git
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
index bcd54931d88..994ec6708bd 100644
---
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
+++
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java
@@ -61,7 +61,6 @@ import
org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.model.HoodieRecordGlobalLocation;
import org.apache.hudi.common.model.HoodieRecordMerger;
import org.apache.hudi.common.model.HoodieWriteStat;
-import org.apache.hudi.common.model.WriteOperationType;
import org.apache.hudi.common.table.HoodieTableConfig;
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.TableSchemaResolver;
@@ -2159,7 +2158,7 @@ public class HoodieTableMetadataUtil {
}
public static HoodieData<HoodieRecord>
convertFilesToPartitionStatsRecords(HoodieEngineContext engineContext,
-
List<DirectoryInfo> partitionInfoList,
+
List<Pair<String, FileSlice>> partitionInfoList,
HoodieMetadataConfig metadataConfig,
HoodieTableMetaClient dataTableMetaClient,
Option<Schema> writerSchemaOpt) {
@@ -2173,12 +2172,22 @@ public class HoodieTableMetadataUtil {
return engineContext.emptyHoodieData();
}
LOG.debug("Indexing following columns for partition stats index: {}",
columnsToIndex);
+
+ // Group by partition path and collect file names (BaseFile and LogFiles)
+ List<Pair<String, Set<String>>> partitionToFileNames =
partitionInfoList.stream()
+ .collect(Collectors.groupingBy(Pair::getLeft,
+ Collectors.mapping(pair -> extractFileNames(pair.getRight()),
Collectors.toList())))
+ .entrySet().stream()
+ .map(entry -> Pair.of(entry.getKey(),
+
entry.getValue().stream().flatMap(Set::stream).collect(Collectors.toSet())))
+ .collect(Collectors.toList());
+
// Create records for MDT
- int parallelism = Math.max(Math.min(partitionInfoList.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
- return engineContext.parallelize(partitionInfoList,
parallelism).flatMap(partitionInfo -> {
- final String partitionPath = partitionInfo.getRelativePath();
+ int parallelism = Math.max(Math.min(partitionToFileNames.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ return engineContext.parallelize(partitionToFileNames,
parallelism).flatMap(partitionInfo -> {
+ final String partitionPath = partitionInfo.getKey();
// Step 1: Collect Column Metadata for Each File
- List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionInfo.getFileNameToSizeMap().keySet().stream()
+ List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionInfo.getValue().stream()
.map(fileName -> getFileStatsRangeMetadata(partitionPath, fileName,
dataTableMetaClient, columnsToIndex, false,
metadataConfig.getMaxReaderBufferSize()))
.collect(Collectors.toList());
@@ -2187,6 +2196,14 @@ public class HoodieTableMetadataUtil {
});
}
+ private static Set<String> extractFileNames(FileSlice fileSlice) {
+ Set<String> fileNames = new HashSet<>();
+ Option<HoodieBaseFile> baseFile = fileSlice.getBaseFile();
+ baseFile.ifPresent(hoodieBaseFile ->
fileNames.add(hoodieBaseFile.getFileName()));
+ fileSlice.getLogFiles().forEach(hoodieLogFile ->
fileNames.add(hoodieLogFile.getFileName()));
+ return fileNames;
+ }
+
private static List<HoodieColumnRangeMetadata<Comparable>>
getFileStatsRangeMetadata(String partitionPath,
String fileName,
HoodieTableMetaClient datasetMetaClient,
@@ -2234,8 +2251,7 @@ public class HoodieTableMetadataUtil {
.collect(Collectors.toList());
int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
- boolean shouldScanColStatsForTightBound =
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
- &&
(metadataConfig.isPartitionStatsIndexConsolidationEnabledOnEveryWrite() ||
shouldConsolidatePartitionStats(commitMetadata, dataMetaClient));
+ boolean shouldScanColStatsForTightBound =
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient);
HoodieTableMetadata tableMetadata;
if (shouldScanColStatsForTightBound) {
tableMetadata = HoodieTableMetadata.create(engineContext,
dataMetaClient.getStorage(), metadataConfig,
dataMetaClient.getBasePath().toString());
@@ -2280,31 +2296,6 @@ public class HoodieTableMetadataUtil {
}
}
- private static boolean shouldConsolidatePartitionStats(HoodieCommitMetadata
commitMetadata, HoodieTableMetaClient metaClient) {
- if
(WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType()))
{
- return true;
- }
- // if not for compaction or clustering, lets check for any new base file
added.
- HoodieTableFileSystemView fsv = getFileSystemView(metaClient);
- try {
- fsv.loadPartitions(new
ArrayList<>(commitMetadata.getPartitionToWriteStats().keySet()));
- // Collect already existing filegroupIds.
- Map<String, List<String>> partitionToExistingFileIds = new HashMap<>();
- commitMetadata.getPartitionToWriteStats().keySet().forEach(partition -> {
- partitionToExistingFileIds.put(partition,
fsv.getLatestBaseFiles(partition).map(baseFile ->
baseFile.getFileId()).collect(Collectors.toList()));
- });
- // check if new base file is added to any of existing file groups.
- // as of now, if any one partition has a new base file added, we are
triggering tighter bound computation for all partitions touched in this commit.
- return
commitMetadata.getPartitionToWriteStats().entrySet().stream().filter(entry -> {
- List<String> existingFileIds =
partitionToExistingFileIds.get(entry.getKey());
- return entry.getValue().stream().filter(writeStat ->
writeStat.getPath().contains(".parquet"))
- .filter(writeStat ->
existingFileIds.contains(writeStat.getFileId())).findAny().isPresent();
- }).findAny().isPresent();
- } finally {
- fsv.close();
- }
- }
-
/**
* Generate key prefixes for each combination of column name in {@param
columnsToIndex} and {@param partitionName}.
*/
diff --git
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
index 9586171d97a..32e0d2252a9 100644
---
a/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
+++
b/hudi-hadoop-common/src/test/java/org/apache/hudi/metadata/TestHoodieTableMetadataUtil.java
@@ -103,7 +103,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
hoodieTestTable = hoodieTestTable.addCommit(instant1);
String instant2 = "20230918121110000";
hoodieTestTable = hoodieTestTable.addCommit(instant2);
- List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = new
ArrayList<>();
+ List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
// Generate 10 inserts for each partition and populate
partitionBaseFilePairs and recordKeys.
DATE_PARTITIONS.forEach(p -> {
try {
@@ -131,11 +131,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
engineContext);
HoodieBaseFile baseFile2 = new
HoodieBaseFile(hoodieTestTable.getBaseFilePath(p, fileId2).toString());
fileSlice2.setBaseFile(baseFile2);
- partitionInfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(
- p,
-
metaClient.getStorage().listDirectEntries(Arrays.asList(partitionMetadataPath,
storagePath1, storagePath2)),
- instant2,
- Collections.emptySet()));
+ partitionFileSlicePairs.add(Pair.of(p, fileSlice1));
+ partitionFileSlicePairs.add(Pair.of(p, fileSlice2));
} catch (Exception e) {
throw new RuntimeException(e);
}
@@ -144,7 +141,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
List<String> columnsToIndex = Arrays.asList("rider", "driver");
HoodieData<HoodieRecord> result =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
engineContext,
- partitionInfoList,
+ partitionFileSlicePairs,
HoodieMetadataConfig.newBuilder().enable(true)
.withMetadataIndexColumnStats(true)
.withMetadataIndexPartitionStats(true)
@@ -216,7 +213,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
hoodieTestTable = hoodieTestTable.addCommit(instant1,
Option.of(commitMetadata));
String instant2 = "20230918121110000";
hoodieTestTable = hoodieTestTable.addCommit(instant2);
- List<HoodieTableMetadataUtil.DirectoryInfo> partitionInfoList = new
ArrayList<>();
+ List<Pair<String, FileSlice>> partitionFileSlicePairs = new ArrayList<>();
List<String> columnsToIndex = Arrays.asList("rider", "driver");
// Generate 10 inserts for each partition and populate
partitionBaseFilePairs and recordKeys.
DATE_PARTITIONS.forEach(p -> {
@@ -237,11 +234,8 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
writeLogFiles(new StoragePath(metaClient.getBasePath(), p),
HoodieTestDataGenerator.AVRO_SCHEMA_WITH_METADATA_FIELDS,
dataGen.generateInsertsForPartition(instant2, 10, p), 1,
metaClient.getStorage(), new Properties(), fileId1, instant2);
fileSlice2.addLogFile(new
HoodieLogFile(storagePath2.toUri().toString()));
- partitionInfoList.add(new HoodieTableMetadataUtil.DirectoryInfo(
- p,
-
metaClient.getStorage().listDirectEntries(Arrays.asList(partitionMetadataPath,
storagePath1, storagePath2)),
- instant2,
- Collections.emptySet()));
+ partitionFileSlicePairs.add(Pair.of(p, fileSlice1));
+ partitionFileSlicePairs.add(Pair.of(p, fileSlice2));
// NOTE: we need to set table config as we are not using write client
explicitly and these configs are needed for log record reader
metaClient.getTableConfig().setValue(HoodieTableConfig.POPULATE_META_FIELDS.key(),
"false");
metaClient.getTableConfig().setValue(HoodieTableConfig.RECORDKEY_FIELDS.key(),
"_row_key");
@@ -261,7 +255,7 @@ public class TestHoodieTableMetadataUtil extends
HoodieCommonTestHarness {
// collect partition stats, this will collect stats for log files as well
HoodieData<HoodieRecord> result =
HoodieTableMetadataUtil.convertFilesToPartitionStatsRecords(
engineContext,
- partitionInfoList,
+ partitionFileSlicePairs,
HoodieMetadataConfig.newBuilder().enable(true)
.withMetadataIndexColumnStats(true)
.withMetadataIndexPartitionStats(true)
diff --git
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
index 05b480c3542..f585b720fcd 100644
---
a/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
+++
b/hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala
@@ -285,88 +285,85 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
* 2. Update to widen the bounds: Perform updates to increase the price
values, causing the min/max stats to widen.
* 3. Update to remove or lower the max value: Delete or update the record
that holds the max value to simulate a
* scenario where the bounds need to be tightened.
- * 4. Trigger stats recomputation: Assuming the config for tighter bounds is
enabled, validate that the partition stats
+ * 4. Trigger stats recomputation: Assuming tighter bounds is enabled on
every write, validate that the partition stats
* have adjusted correctly after scanning and recomputing accurate stats.
*/
test("Test partition stats index with tight bound") {
Seq("cow", "mor").foreach { tableType =>
- Seq("true", "false").foreach {
isPartitionStatsIndexConsolidationEnabledOnEveryWrite =>
- withTempDir { tmp =>
- val tableName = generateTableName +
s"_tight_bound_$isPartitionStatsIndexConsolidationEnabledOnEveryWrite"
- val tablePath = s"${tmp.getCanonicalPath}/$tableName"
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price int,
- | ts long
- |) using hudi
- |partitioned by (ts)
- |tblproperties (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'price',
- | hoodie.metadata.index.partition.stats.enable = 'true',
- |
hoodie.metadata.index.partition.stats.consolidate.on.every.write =
'$isPartitionStatsIndexConsolidationEnabledOnEveryWrite',
- | hoodie.metadata.index.column.stats.enable = 'true',
- | hoodie.metadata.index.column.stats.column.list = 'price'
- |)
- |location '$tablePath'
- |""".stripMargin
- )
+ withTempDir { tmp =>
+ val tableName = generateTableName + s"_tight_bound_$tableType"
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price int,
+ | ts long
+ |) using hudi
+ |partitioned by (ts)
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'price',
+ | hoodie.metadata.index.partition.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.column.list = 'price'
+ |)
+ |location '$tablePath'
+ |""".stripMargin
+ )
- /**
- * Insert: Insert values for price across multiple partitions
(ts=10, ts=20, ts=30):
- *
- * Partition ts=10: price = 1000, 1500
- * Partition ts=20: price = 2000, 2500
- * Partition ts=30: price = 3000, 3500
- *
- * This will initialize the partition stats with bounds like [1000,
1500], [2000, 2500], and [3000, 3500].
- */
- spark.sql(s"insert into $tableName values (1, 'a1', 1000, 10), (2,
'a2', 1500, 10)")
- spark.sql(s"insert into $tableName values (3, 'a3', 2000, 20), (4,
'a4', 2500, 20)")
- spark.sql(s"insert into $tableName values (5, 'a5', 3000, 30), (6,
'a6', 3500, 30)")
-
- // validate partition stats initialization
- checkAnswer(s"select key,
ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName')
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
- Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500),
- Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500),
- Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3500)
- )
+ /**
+ * Insert: Insert values for price across multiple partitions (ts=10,
ts=20, ts=30):
+ *
+ * Partition ts=10: price = 1000, 1500
+ * Partition ts=20: price = 2000, 2500
+ * Partition ts=30: price = 3000, 3500
+ *
+ * This will initialize the partition stats with bounds like [1000,
1500], [2000, 2500], and [3000, 3500].
+ */
+ spark.sql(s"insert into $tableName values (1, 'a1', 1000, 10), (2,
'a2', 1500, 10)")
+ spark.sql(s"insert into $tableName values (3, 'a3', 2000, 20), (4,
'a4', 2500, 20)")
+ spark.sql(s"insert into $tableName values (5, 'a5', 3000, 30), (6,
'a6', 3500, 30)")
+
+ // validate partition stats initialization
+ checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value from hudi_metadata('$tableName')
where type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500),
+ Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3500)
+ )
- // First Update (widen the bounds): Update the price in partition
ts=30, where price = 4000 for id=6.
- // This will widen the max bounds
in ts=30 from 3500 to 4000.
- spark.sql(s"update $tableName set price = 4000 where id = 6")
- // Validate widened stats
- checkAnswer(s"select key,
ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
- Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
- Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
- // for COW table, that stats are consolidated on every write as
there is a new slice for the same filegroup
- Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4000,
isPartitionStatsIndexConsolidationEnabledOnEveryWrite.toBoolean || tableType ==
"cow")
- )
- // verify file pruning
- var metaClient = HoodieTableMetaClient.builder()
- .setBasePath(tablePath)
- .setConf(HoodieTestUtils.getDefaultStorageConf)
- .build()
-
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
- GreaterThan(AttributeReference("price", IntegerType)(),
Literal(3000)), metaClient, isDataSkippingExpected = true)
-
- // Second update (reduce max value)
- spark.sql(s"delete from $tableName where id = 6")
- // Validate that stats have recomputed and tightened
- checkAnswer(s"select key,
ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
- Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
- Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
- Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3000, true)
// tighter bound, note that record with prev max was deleted
- )
- // verify file pruning
- metaClient = HoodieTableMetaClient.reload(metaClient)
-
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
- GreaterThan(AttributeReference("price", IntegerType)(),
Literal(3000)), metaClient, isDataSkippingExpected = true, isNoScanExpected =
true)
- }
+ // First Update (widen the bounds): Update the price in partition
ts=30, where price = 4000 for id=6.
+ // This will widen the max bounds in
ts=30 from 3500 to 4000.
+ spark.sql(s"update $tableName set price = 4000 where id = 6")
+ // Validate widened stats
+ checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
+ Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4000, true)
+ )
+ // verify file pruning
+ var metaClient = HoodieTableMetaClient.builder()
+ .setBasePath(tablePath)
+ .setConf(HoodieTestUtils.getDefaultStorageConf)
+ .build()
+
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("price", IntegerType)(),
Literal(3000)), metaClient, isDataSkippingExpected = true)
+
+ // Second update (reduce max value)
+ spark.sql(s"delete from $tableName where id = 6")
+ // Validate that stats have recomputed and tightened
+ // if tighter bound, note that record with prev max was deleted
+ checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
+ Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 1500, true),
+ Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 2500, true),
+ Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 3000, true)
+ )
+ // verify file pruning
+ metaClient = HoodieTableMetaClient.reload(metaClient)
+
verifyFilePruning(Map.apply(DataSourceReadOptions.ENABLE_DATA_SKIPPING.key ->
"true", HoodieMetadataConfig.ENABLE.key -> "true"),
+ GreaterThan(AttributeReference("price", IntegerType)(),
Literal(3000)), metaClient, isDataSkippingExpected = true, isNoScanExpected =
true)
}
}
}
@@ -447,7 +444,7 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
checkAnswer(s"select key, ColumnStatsMetadata.minValue.member1.value,
ColumnStatsMetadata.maxValue.member1.value, ColumnStatsMetadata.isTightBound
from hudi_metadata('$tableName') where
type=${MetadataPartitionType.PARTITION_STATS.getRecordType} and
ColumnStatsMetadata.columnName='price'")(
Seq(getPartitionStatsIndexKey("ts=10", "price"), 1000, 2000, true),
Seq(getPartitionStatsIndexKey("ts=20", "price"), 2000, 3000, true),
- Seq(getPartitionStatsIndexKey("ts=30", "price"), 3000, 4003, false)
+ Seq(getPartitionStatsIndexKey("ts=30", "price"), 4003, 4003, true)
)
}
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
index a232ca32324..488207adf8c 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMetadataTableValidator.java
@@ -407,9 +407,6 @@ public class TestHoodieMetadataTableValidator extends
HoodieSparkClientTestBase
writeOptions.put(DataSourceWriteOptions.TABLE_NAME().key(), "test_table");
writeOptions.put("hoodie.table.name", "test_table");
writeOptions.put(DataSourceWriteOptions.TABLE_TYPE().key(), tableType);
- if (tableType.equals("COPY_ON_WRITE")) {
-
writeOptions.put(HoodieMetadataConfig.PARTITION_STATS_INDEX_CONSOLIDATE_ON_EVERY_WRITE.key(),
"true");
- }
writeOptions.put(DataSourceWriteOptions.RECORDKEY_FIELD().key(),
"_row_key");
writeOptions.put(DataSourceWriteOptions.PRECOMBINE_FIELD().key(),
"timestamp");
writeOptions.put(DataSourceWriteOptions.PARTITIONPATH_FIELD().key(),
"partition_path");