yihua commented on code in PR #18126:
URL: https://github.com/apache/hudi/pull/18126#discussion_r2824941275
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -502,6 +532,66 @@ object HoodieFileIndex extends Logging {
val Strict: Val = Val("strict")
}
+ /**
+ * Re-splits the combined set of filters so that predicates whose attribute
references are
+ * all struct-parents of nested partition column names are promoted to
partition filters.
+ *
+ * == Root cause of misclassification ==
+ * In `FileSourceStrategy.apply` (Spark's physical planning rule), filters
are split into
+ * partition filters and data filters using:
+ * {{{
+ * val partitionSet = AttributeSet(l.resolve(relation.partitionSchema,
resolver))
+ * val (partitionFilters, dataFilters) = normalizedFilters.partition { f =>
+ * f.references.subsetOf(partitionSet)
+ * }
+ * }}}
+ * For a Hudi table with partition column `nested_record.level`, the
partition schema holds
+ * a flat `StructField("nested_record.level", StringType)`. However, when
Spark's analyser
+ * resolves the user filter `nested_record.level = 'INFO'`, it sees
`nested_record` as a
+ * known `StructType` attribute in the table output and rewrites the
predicate as
+ * `GetStructField(AttributeReference("nested_record", StructType(…)),
ordinal, "level") = "INFO"`.
+ * That expression's `references` set is `{nested_record}`. Because
`{nested_record}` is not
+ * a subset of the `partitionSet` (which contains the unresolvable flat name
`nested_record.level`),
+ * the predicate is classified as a data filter and `listFiles` is called
with empty
+ * `partitionFilters`, bypassing partition pruning entirely.
+ *
+ * This method corrects the classification by treating any
`AttributeReference` whose logical
+ * name is a struct-parent prefix of a nested partition column name (e.g.
`nested_record` for
+ * `nested_record.level`) as a partition attribute, and re-partitions all
filters accordingly.
+ *
+ * This is a no-op when `partitionColumnNames` contains no nested (dot-path)
names.
+ *
+ * @param partitionColumnNames flat dot-path names of all partition columns
(e.g. `["nested_record.level"]`)
+ * @param resolver case-sensitivity resolver from the active
Spark session
+ * @param partitionFilters filters already classified as partition
filters by Spark
+ * @param dataFilters filters already classified as data filters by
Spark
+ * @return corrected `(partitionFilters, dataFilters)` pair
+ */
+ private[hudi] def reclassifyFiltersForNestedPartitionColumns(
+ partitionColumnNames: Seq[String],
+ resolver: (String, String) => Boolean,
+ partitionFilters: Seq[Expression],
+ dataFilters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
+ // Only applies to tables that have at least one nested partition column.
+ if (!partitionColumnNames.exists(_.contains("."))) {
+ return (partitionFilters, dataFilters)
+ }
+ val allFilters = partitionFilters ++ dataFilters
+ // Identify AttributeReferences that are exact matches or struct-parent
prefixes
+ // of nested partition column names (e.g. "nested_record" for
"nested_record.level").
+ val partitionAttrRefs = allFilters.flatMap { expr =>
Review Comment:
The struct-parent prefix check `col.startsWith(logicalName + ".")` is too
broad — it will promote filters on *any* field of the struct, not just the
partition field. For example, if `nested_record.level` is the partition column
and a query has `nested_record.other_field = 'foo'`, the
`AttributeReference("nested_record")` from that filter will also match here,
causing the non-partition filter to be incorrectly promoted. Could you also
verify the `GetStructField` path (e.g. check that the full dot-path matches a
partition column) rather than just the struct root?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -222,10 +225,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
def listMatchingPartitionPaths(predicates: Seq[Expression]):
Seq[PartitionPath] = {
val resolve = spark.sessionState.analyzer.resolver
val partitionColumnNames = getPartitionColumns
+ // Strip Spark's internal exprId suffix (e.g. #136) so nested_record#136
matches nested_record.level
Review Comment:
In Spark's `AttributeReference`, the `name` field doesn't include the
`#exprId` suffix — that's stored as a separate field and only appears in
`toString`. Is there a specific scenario where `ref` would actually contain
`#digits` here? If not, this `replaceAll` might be misleading to future readers.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -167,19 +167,49 @@ case class HoodieFileIndex(spark: SparkSession,
/**
* Invoked by Spark to fetch list of latest base files per partition.
*
- * @param partitionFilters partition column filters
+ * NOTE: For tables with nested partition columns (e.g.
`nested_record.level`), Spark's
+ * [[org.apache.spark.sql.execution.datasources.FileSourceScanExec]] uses
standard attribute-name
+ * matching when splitting filters into partition vs. data filters. Because
the filter expression
+ * for `nested_record.level = 'INFO'` is represented as
+ * `GetStructField(AttributeReference("nested_record"), …)` — whose
reference is the *struct*
+ * attribute `nested_record`, not the flat partition attribute
`nested_record.level` — Spark
+ * classifies it as a data filter. This means `partitionFilters` arrives
here empty and
+ * `dataFilters` contains the nested-field predicate. We re-split the
combined set of filters
+ * below so that predicates whose only references are struct-parents of
partition columns are
+ * treated as partition filters, matching the behaviour of
[[HoodiePruneFileSourcePartitions]].
+ *
+ * @param partitionFilters partition column filters (may be incomplete for
nested columns)
* @param dataFilters data columns filters
* @return list of PartitionDirectory containing partition to base files
mapping
*/
override def listFiles(partitionFilters: Seq[Expression], dataFilters:
Seq[Expression]): Seq[PartitionDirectory] = {
- val slices = filterFileSlices(dataFilters, partitionFilters).flatMap(
+ val (actualPartitionFilters, actualDataFilters) =
+ reclassifyFiltersForNestedPartitionColumns(partitionFilters, dataFilters)
Review Comment:
Reclassifying the filters looks more like a bandaid fix. Could we fix the
integration so that Spark properly classifies the partition vs data filters?
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -253,10 +262,26 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
// the whole table
if (haveProperPartitionValues(partitionPaths.toSeq) &&
partitionSchema.nonEmpty) {
val predicate = partitionPruningPredicates.reduce(expressions.And)
+ val partitionFieldNames = partitionSchema.fieldNames
+ def getPartitionColumnPath(expr: Expression): Option[String] = expr
match {
+ case a: AttributeReference =>
+ Some(a.name.replaceAll("#\\d+$", ""))
+ case GetStructField(child, _, Some(fieldName)) =>
+ getPartitionColumnPath(child).map(_ + "." + fieldName)
+ case _ => None
+ }
val transformedPredicate = predicate.transform {
+ case g @ GetStructField(_, _, Some(_)) =>
+ getPartitionColumnPath(g).flatMap { path =>
Review Comment:
The `partitionFieldNames.indexOf(path)` is always case-sensitive, but the
`AttributeReference` branch below uses the session's `resolve` function to
respect the Spark case-sensitivity setting. Could you use the resolver here too
for consistency?
##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/analysis/Spark3HoodiePruneFileSourcePartitions.scala:
##########
@@ -105,11 +105,31 @@ private object Spark3HoodiePruneFileSourcePartitions
extends PredicateHelper {
Project(projects, withFilter)
}
+ /**
+ * Returns the logical name of an attribute by stripping Spark's internal
exprId suffix (e.g. #136).
+ * Filter expressions may reference columns with these suffixed names (e.g.
nested_record#136.level),
+ * while partition schema uses logical names (e.g. nested_record.level).
+ */
+ private def logicalAttributeName(attr: AttributeReference): String = {
+ attr.name.replaceAll("#\\d+$", "")
+ }
+
+ /**
+ * Returns true if the given attribute references a partition column. An
attribute references a
+ * partition column if its logical name (without #exprId) equals a partition
column name or
+ * is the struct parent of a nested partition path (e.g. nested_record for
nested_record.level).
+ */
+ private def isPartitionColumnReference(attr: AttributeReference,
partitionSchema: StructType): Boolean = {
+ val logicalName = logicalAttributeName(attr)
+ partitionSchema.names.contains(logicalName) ||
+ partitionSchema.names.exists(_.startsWith(logicalName + "."))
+ }
+
def getPartitionFiltersAndDataFilters(partitionSchema: StructType,
Review Comment:
Same concern as in `reclassifyFiltersForNestedPartitionColumns`: this
`startsWith(logicalName + ".")` check will match
`AttributeReference("nested_record")` from *any* filter on the `nested_record`
struct, not just the partition field. A filter on
`nested_record.some_other_field` would be incorrectly treated as a partition
filter.
##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -167,19 +167,49 @@ case class HoodieFileIndex(spark: SparkSession,
/**
* Invoked by Spark to fetch list of latest base files per partition.
*
- * @param partitionFilters partition column filters
+ * NOTE: For tables with nested partition columns (e.g.
`nested_record.level`), Spark's
+ * [[org.apache.spark.sql.execution.datasources.FileSourceScanExec]] uses
standard attribute-name
+ * matching when splitting filters into partition vs. data filters. Because
the filter expression
+ * for `nested_record.level = 'INFO'` is represented as
Review Comment:
How does Spark handles nested partition filters for parquet tables? Does
`FileSourceScanExec` properly classifies the partition vs data filters? If so,
should somewhere else in Hudi Spark integration be fixed, e.g., how the
partition fields are provided?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]