nsivabalan commented on code in PR #12050:
URL: https://github.com/apache/hudi/pull/12050#discussion_r1792605271
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2127,29 +2159,76 @@ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(Ho
.collect(Collectors.toList());
int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ boolean shouldScanColStatsForTightBound =
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
+ && metadataConfig.isPartitionStatsIndexTightBoundEnabled() &&
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+ HoodieTableMetadata tableMetadata;
+ if (shouldScanColStatsForTightBound) {
+ tableMetadata = HoodieTableMetadata.create(engineContext,
dataMetaClient.getStorage(), metadataConfig,
dataMetaClient.getBasePath().toString());
+ } else {
+ tableMetadata = null;
+ }
return engineContext.parallelize(partitionedWriteStats,
parallelism).flatMap(partitionedWriteStat -> {
final String partitionName =
partitionedWriteStat.get(0).getPartitionPath();
- // Step 1: Collect Column Metadata for Each File
+ // Step 1: Collect Column Metadata for Each File part of current
commit metadata
List<List<HoodieColumnRangeMetadata<Comparable>>> fileColumnMetadata =
partitionedWriteStat.stream()
- .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex))
- .collect(Collectors.toList());
+ .map(writeStat -> translateWriteStatToFileStats(writeStat,
dataMetaClient, columnsToIndex, tableSchema))
+ .collect(Collectors.toList());
+ if (shouldScanColStatsForTightBound) {
+ checkState(tableMetadata != null, "tableMetadata should not be null
when scanning metadata table");
+ // Collect Column Metadata for Each File part of active file system
view of latest snapshot
+ // Get all file names, including log files, in a set from the file
slices
+ Set<String> fileNames = getPartitionLatestFileSlices(dataMetaClient,
Option.empty(), partitionName).stream()
+ .flatMap(fileSlice -> Stream.concat(
+
Stream.of(fileSlice.getBaseFile().map(HoodieBaseFile::getFileName).orElse(null)),
+ fileSlice.getLogFiles().map(HoodieLogFile::getFileName)))
+ .filter(Objects::nonNull)
+ .collect(Collectors.toSet());
+ // Fetch metadata table COLUMN_STATS partition records for above
files
+ List<HoodieColumnRangeMetadata<Comparable>> partitionColumnMetadata =
+
tableMetadata.getRecordsByKeyPrefixes(generateKeyPrefixes(columnsToIndex,
partitionName), MetadataPartitionType.COLUMN_STATS.getPartitionPath(), false)
Review Comment:
should we try to find partitions that has gaps wrt current delta write stat
and only compute stats for them?
for eg, say there are 10 partitions in total.
say 6 of them r going through compaction.
out of 6, 4 are fully compacted i.e. all file groups are compacted. but rest
2 partition only has few file groups are compacted.
So, we can only need to re-compute the partition stats for 5 to 10. and
ignore the first 4 (which has full stats in the current delta write stat)
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -2127,29 +2159,76 @@ public static HoodieData<HoodieRecord>
convertMetadataToPartitionStatsRecords(Ho
.collect(Collectors.toList());
int parallelism = Math.max(Math.min(partitionedWriteStats.size(),
metadataConfig.getPartitionStatsIndexParallelism()), 1);
+ boolean shouldScanColStatsForTightBound =
MetadataPartitionType.COLUMN_STATS.isMetadataPartitionAvailable(dataMetaClient)
+ && metadataConfig.isPartitionStatsIndexTightBoundEnabled() &&
WriteOperationType.isPartitionStatsTightBoundRequired(commitMetadata.getOperationType());
+ HoodieTableMetadata tableMetadata;
+ if (shouldScanColStatsForTightBound) {
+ tableMetadata = HoodieTableMetadata.create(engineContext,
dataMetaClient.getStorage(), metadataConfig,
dataMetaClient.getBasePath().toString());
+ } else {
+ tableMetadata = null;
Review Comment:
can we move the below logic within this if block only.
trying to avoid doing null assignment to tableMetadata.
##########
hudi-spark-datasource/hudi-spark/src/test/scala/org/apache/hudi/functional/TestPartitionStatsIndexWithSql.scala:
##########
@@ -238,30 +238,37 @@ class TestPartitionStatsIndexWithSql extends
HoodieSparkSqlTestBase {
test(s"Test partition stats index on int type field with update and file
pruning") {
Seq("cow", "mor").foreach { tableType =>
- withTempDir { tmp =>
- val tableName = generateTableName
- val tablePath = s"${tmp.getCanonicalPath}/$tableName"
- spark.sql(
- s"""
- |create table $tableName (
- | id int,
- | name string,
- | price int,
- | ts long
- |) using hudi
- |partitioned by (ts)
- |tblproperties (
- | type = '$tableType',
- | primaryKey = 'id',
- | preCombineField = 'price',
- | hoodie.metadata.index.partition.stats.enable = 'true',
- | hoodie.metadata.index.column.stats.column.list = 'price'
- |)
- |location '$tablePath'
- |""".stripMargin
- )
-
- writeAndValidatePartitionStats(tableName, tablePath)
+ Seq(true, false).foreach { shouldCompact =>
+ withTempDir { tmp =>
+ val tableName = generateTableName
+ val tablePath = s"${tmp.getCanonicalPath}/$tableName"
+ spark.sql(
+ s"""
+ |create table $tableName (
+ | id int,
+ | name string,
+ | price int,
+ | ts long
+ |) using hudi
+ |partitioned by (ts)
+ |tblproperties (
+ | type = '$tableType',
+ | primaryKey = 'id',
+ | preCombineField = 'price',
+ | hoodie.metadata.index.partition.stats.enable = 'true',
+ | hoodie.metadata.index.column.stats.column.list = 'price'
+ |)
+ |location '$tablePath'
+ |""".stripMargin
+ )
+
+ // trigger compaction after update and validate stats
+ if (tableType == "mor" && shouldCompact) {
+ spark.sql("set hoodie.compact.inline=true")
+ spark.sql("set hoodie.compact.inline.max.delta.commits=2")
+ }
+ writeAndValidatePartitionStats(tableName, tablePath)
Review Comment:
I mean, how do we expect the validation to succceed?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1215,20 +1204,25 @@ private static Stream<HoodieRecord>
getColumnStatsRecords(String partitionPath,
}
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
- readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex);
+ readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, false, Option.empty());
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadata, false);
}
private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String filePath,
HoodieTableMetaClient datasetMetaClient,
-
List<String> columnsToIndex) {
+
List<String> columnsToIndex,
+
boolean shouldReadColumnStatsForLogFiles,
+
Option<Schema> writerSchemaOpt) {
try {
+ StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(),
fullFilePath, columnsToIndex);
+ } else if (FSUtils.isLogFile(fullFilePath) &&
shouldReadColumnStatsForLogFiles) {
+ LOG.warn("Reading log file: {}, to build column range metadata.",
fullFilePath);
+ return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt);
Review Comment:
oh, I was actually confusing w/ functional/sec index. this patch is just for
partition level stats for column stats right.
then, HoodieDetlaWriteStat already will have the col level stats right.
can't we reuse that to compute partition level stats and use it here?
##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieTableMetadataUtil.java:
##########
@@ -1215,20 +1204,25 @@ private static Stream<HoodieRecord>
getColumnStatsRecords(String partitionPath,
}
List<HoodieColumnRangeMetadata<Comparable>> columnRangeMetadata =
- readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex);
+ readColumnRangeMetadataFrom(filePartitionPath, datasetMetaClient,
columnsToIndex, false, Option.empty());
return HoodieMetadataPayload.createColumnStatsRecords(partitionPath,
columnRangeMetadata, false);
}
private static List<HoodieColumnRangeMetadata<Comparable>>
readColumnRangeMetadataFrom(String filePath,
HoodieTableMetaClient datasetMetaClient,
-
List<String> columnsToIndex) {
+
List<String> columnsToIndex,
+
boolean shouldReadColumnStatsForLogFiles,
+
Option<Schema> writerSchemaOpt) {
try {
+ StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
if (filePath.endsWith(HoodieFileFormat.PARQUET.getFileExtension())) {
- StoragePath fullFilePath = new
StoragePath(datasetMetaClient.getBasePath(), filePath);
return HoodieIOFactory.getIOFactory(datasetMetaClient.getStorage())
.getFileFormatUtils(HoodieFileFormat.PARQUET)
.readColumnStatsFromMetadata(datasetMetaClient.getStorage(),
fullFilePath, columnsToIndex);
+ } else if (FSUtils.isLogFile(fullFilePath) &&
shouldReadColumnStatsForLogFiles) {
+ LOG.warn("Reading log file: {}, to build column range metadata.",
fullFilePath);
+ return getLogFileColumnRangeMetadata(fullFilePath.toString(),
datasetMetaClient, columnsToIndex, writerSchemaOpt);
Review Comment:
why are we not computing these stats in append handle and sending it here?
thats what we are doing for col stats index right. shouldn't we be doing the
same here?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]