nsivabalan commented on code in PR #12744:
URL: https://github.com/apache/hudi/pull/12744#discussion_r1936151857
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -81,67 +82,65 @@ class PartitionStatsIndexSupport(spark: SparkSession,
columnStatsRecords
}
- def prunePartitions(fileIndex: HoodieFileIndex,
- queryFilters: Seq[Expression],
- queryReferencedColumns: Seq[String]):
Option[Set[String]] = {
- if (isIndexAvailable && queryFilters.nonEmpty &&
queryReferencedColumns.nonEmpty) {
- if (containsAnySqlFunction(queryFilters)) {
- // If the query contains any SQL function, skip the pruning.
- // Expression Index will be used in such cases, if available.
- Option.empty
- } else {
+ def prunePartitions(fileIndex: HoodieFileIndex, queryFilters:
Seq[Expression]): Option[Set[String]] = {
+ if (isIndexAvailable) {
Review Comment:
we can do the following
```
isIndexAvailable && queryFilters.nonEmpty) {
```
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -81,67 +82,65 @@ class PartitionStatsIndexSupport(spark: SparkSession,
columnStatsRecords
}
- def prunePartitions(fileIndex: HoodieFileIndex,
- queryFilters: Seq[Expression],
- queryReferencedColumns: Seq[String]):
Option[Set[String]] = {
- if (isIndexAvailable && queryFilters.nonEmpty &&
queryReferencedColumns.nonEmpty) {
- if (containsAnySqlFunction(queryFilters)) {
- // If the query contains any SQL function, skip the pruning.
- // Expression Index will be used in such cases, if available.
- Option.empty
- } else {
+ def prunePartitions(fileIndex: HoodieFileIndex, queryFilters:
Seq[Expression]): Option[Set[String]] = {
+ if (isIndexAvailable) {
+ // Filter out sql queries. Partition stats only supports simple queries
on field attribute without any operation on the field
+ val nonSqlFilters = filterNonSqlExpressions(queryFilters)
+ val indexedCols: Seq[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+ // Filter out queries involving null and value count checks
+ val filteredQueryFilters: Seq[Expression] =
filterExpressionsExcludingNullAndValue(nonSqlFilters, indexedCols)
+ lazy val queryReferencedColumns = collectReferencedColumns(spark,
filteredQueryFilters, tableSchema)
+
+ if (filteredQueryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
val readInMemory = shouldReadInMemory(fileIndex,
queryReferencedColumns, inMemoryProjectionThreshold)
loadTransposed(queryReferencedColumns, readInMemory, Option.empty,
Option.empty) {
transposedPartitionStatsDF => {
- val indexedCols: Seq[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
- if (canLookupInPSI(queryFilters, indexedCols)) {
- try {
-
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
- val allPartitions =
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
- if (allPartitions.nonEmpty) {
- // PARTITION_STATS index exist for all or some columns in
the filters
- // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has
covered the case where the
- // column in a filter does not have the stats
available, by making sure such a
- // filter does not prune any partition.
- // to be fixed. HUDI-8836.
- val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols =
indexedCols)).reduce(And)
- if (indexFilter.equals(TrueLiteral)) {
- // if there are any non indexed cols or we can't translate
source expr, we can prune partitions based on col stats lookup.
- Some(allPartitions)
- } else {
- Some(transposedPartitionStatsDF.where(new
Column(indexFilter))
-
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet)
- }
+ try {
+
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
+ val allPartitions =
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+ if (allPartitions.nonEmpty) {
+ // PARTITION_STATS index exist for all or some columns in the
filters
+ // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has
covered the case where the
+ // column in a filter does not have the stats available,
by making sure such a
+ // filter does not prune any partition.
+ // to be fixed. HUDI-8836.
+ val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols =
indexedCols)).reduce(And)
+ if (indexFilter.equals(TrueLiteral)) {
+ // if there are any non indexed cols or we can't translate
source expr, we can prune partitions based on col stats lookup.
+ Some(allPartitions)
} else {
- // PARTITION_STATS index does not exist for any column in
the filters, skip the pruning
- Option.empty
+ Some(transposedPartitionStatsDF.where(new
Column(indexFilter))
Review Comment:
we already have `allPartitions` right?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/PartitionStatsIndexSupport.scala:
##########
@@ -81,67 +82,65 @@ class PartitionStatsIndexSupport(spark: SparkSession,
columnStatsRecords
}
- def prunePartitions(fileIndex: HoodieFileIndex,
- queryFilters: Seq[Expression],
- queryReferencedColumns: Seq[String]):
Option[Set[String]] = {
- if (isIndexAvailable && queryFilters.nonEmpty &&
queryReferencedColumns.nonEmpty) {
- if (containsAnySqlFunction(queryFilters)) {
- // If the query contains any SQL function, skip the pruning.
- // Expression Index will be used in such cases, if available.
- Option.empty
- } else {
+ def prunePartitions(fileIndex: HoodieFileIndex, queryFilters:
Seq[Expression]): Option[Set[String]] = {
+ if (isIndexAvailable) {
+ // Filter out sql queries. Partition stats only supports simple queries
on field attribute without any operation on the field
+ val nonSqlFilters = filterNonSqlExpressions(queryFilters)
+ val indexedCols: Seq[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
+ // Filter out queries involving null and value count checks
+ val filteredQueryFilters: Seq[Expression] =
filterExpressionsExcludingNullAndValue(nonSqlFilters, indexedCols)
+ lazy val queryReferencedColumns = collectReferencedColumns(spark,
filteredQueryFilters, tableSchema)
+
+ if (filteredQueryFilters.nonEmpty && queryReferencedColumns.nonEmpty) {
val readInMemory = shouldReadInMemory(fileIndex,
queryReferencedColumns, inMemoryProjectionThreshold)
loadTransposed(queryReferencedColumns, readInMemory, Option.empty,
Option.empty) {
transposedPartitionStatsDF => {
- val indexedCols: Seq[String] =
metaClient.getIndexMetadata.get().getIndexDefinitions.get(PARTITION_NAME_COLUMN_STATS).getSourceFields.asScala.toSeq
- if (canLookupInPSI(queryFilters, indexedCols)) {
- try {
-
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
- val allPartitions =
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet
- if (allPartitions.nonEmpty) {
- // PARTITION_STATS index exist for all or some columns in
the filters
- // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has
covered the case where the
- // column in a filter does not have the stats
available, by making sure such a
- // filter does not prune any partition.
- // to be fixed. HUDI-8836.
- val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols =
indexedCols)).reduce(And)
- if (indexFilter.equals(TrueLiteral)) {
- // if there are any non indexed cols or we can't translate
source expr, we can prune partitions based on col stats lookup.
- Some(allPartitions)
- } else {
- Some(transposedPartitionStatsDF.where(new
Column(indexFilter))
-
.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
- .collect()
- .map(_.getString(0))
- .toSet)
- }
+ try {
+
transposedPartitionStatsDF.persist(StorageLevel.MEMORY_AND_DISK_SER)
+ val allPartitions =
transposedPartitionStatsDF.select(HoodieMetadataPayload.COLUMN_STATS_FIELD_FILE_NAME)
+ .collect()
+ .map(_.getString(0))
+ .toSet
+ if (allPartitions.nonEmpty) {
+ // PARTITION_STATS index exist for all or some columns in the
filters
+ // NOTE: [[translateIntoColumnStatsIndexFilterExpr]] has
covered the case where the
+ // column in a filter does not have the stats available,
by making sure such a
+ // filter does not prune any partition.
+ // to be fixed. HUDI-8836.
+ val indexFilter =
queryFilters.map(translateIntoColumnStatsIndexFilterExpr(_, indexedCols =
indexedCols)).reduce(And)
+ if (indexFilter.equals(TrueLiteral)) {
+ // if there are any non indexed cols or we can't translate
source expr, we can prune partitions based on col stats lookup.
Review Comment:
minor. "we cannot prune"
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]