yihua commented on code in PR #18126:
URL: https://github.com/apache/hudi/pull/18126#discussion_r2824941275


##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -502,6 +532,66 @@ object HoodieFileIndex extends Logging {
     val Strict: Val   = Val("strict")
   }
 
+  /**
+   * Re-splits the combined set of filters so that predicates whose attribute 
references are
+   * all struct-parents of nested partition column names are promoted to 
partition filters.
+   *
+   * == Root cause of misclassification ==
+   * In `FileSourceStrategy.apply` (Spark's physical planning rule), filters 
are split into
+   * partition filters and data filters using:
+   * {{{
+   *   val partitionSet = AttributeSet(l.resolve(relation.partitionSchema, 
resolver))
+   *   val (partitionFilters, dataFilters) = normalizedFilters.partition { f =>
+   *     f.references.subsetOf(partitionSet)
+   *   }
+   * }}}
+   * For a Hudi table with partition column `nested_record.level`, the 
partition schema holds
+   * a flat `StructField("nested_record.level", StringType)`.  However, when 
Spark's analyser
+   * resolves the user filter `nested_record.level = 'INFO'`, it sees 
`nested_record` as a
+   * known `StructType` attribute in the table output and rewrites the 
predicate as
+   * `GetStructField(AttributeReference("nested_record", StructType(…)), 
ordinal, "level") = "INFO"`.
+   * That expression's `references` set is `{nested_record}`.  Because 
`{nested_record}` is not
+   * a subset of the `partitionSet` (which contains the unresolvable flat name 
`nested_record.level`),
+   * the predicate is classified as a data filter and `listFiles` is called 
with empty
+   * `partitionFilters`, bypassing partition pruning entirely.
+   *
+   * This method corrects the classification by treating any 
`AttributeReference` whose logical
+   * name is a struct-parent prefix of a nested partition column name (e.g. 
`nested_record` for
+   * `nested_record.level`) as a partition attribute, and re-partitions all 
filters accordingly.
+   *
+   * This is a no-op when `partitionColumnNames` contains no nested (dot-path) 
names.
+   *
+   * @param partitionColumnNames flat dot-path names of all partition columns 
(e.g. `["nested_record.level"]`)
+   * @param resolver             case-sensitivity resolver from the active 
Spark session
+   * @param partitionFilters     filters already classified as partition 
filters by Spark
+   * @param dataFilters          filters already classified as data filters by 
Spark
+   * @return corrected `(partitionFilters, dataFilters)` pair
+   */
+  private[hudi] def reclassifyFiltersForNestedPartitionColumns(
+      partitionColumnNames: Seq[String],
+      resolver: (String, String) => Boolean,
+      partitionFilters: Seq[Expression],
+      dataFilters: Seq[Expression]): (Seq[Expression], Seq[Expression]) = {
+    // Only applies to tables that have at least one nested partition column.
+    if (!partitionColumnNames.exists(_.contains("."))) {
+      return (partitionFilters, dataFilters)
+    }
+    val allFilters = partitionFilters ++ dataFilters
+    // Identify AttributeReferences that are exact matches or struct-parent 
prefixes
+    // of nested partition column names (e.g. "nested_record" for 
"nested_record.level").
+    val partitionAttrRefs = allFilters.flatMap { expr =>

Review Comment:
   The struct-parent prefix check `col.startsWith(logicalName + ".")` is too 
broad — it will promote filters on *any* field of the struct, not just the 
partition field. For example, if `nested_record.level` is the partition column 
and a query has `nested_record.other_field = 'foo'`, the 
`AttributeReference("nested_record")` from that filter will also match here, 
causing the non-partition filter to be incorrectly promoted. Could you also 
verify the `GetStructField` path (e.g. check that the full dot-path matches a 
partition column) rather than just the struct root?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -222,10 +225,16 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
   def listMatchingPartitionPaths(predicates: Seq[Expression]): 
Seq[PartitionPath] = {
     val resolve = spark.sessionState.analyzer.resolver
     val partitionColumnNames = getPartitionColumns
+    // Strip Spark's internal exprId suffix (e.g. #136) so nested_record#136 
matches nested_record.level

Review Comment:
   In Spark's `AttributeReference`, the `name` field doesn't include the 
`#exprId` suffix — that's stored as a separate field and only appears in 
`toString`. Is there a specific scenario where `ref` would actually contain 
`#digits` here? If not, this `replaceAll` might be misleading to future readers.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -167,19 +167,49 @@ case class HoodieFileIndex(spark: SparkSession,
   /**
    * Invoked by Spark to fetch list of latest base files per partition.
    *
-   * @param partitionFilters partition column filters
+   * NOTE: For tables with nested partition columns (e.g. 
`nested_record.level`), Spark's
+   * [[org.apache.spark.sql.execution.datasources.FileSourceScanExec]] uses 
standard attribute-name
+   * matching when splitting filters into partition vs. data filters. Because 
the filter expression
+   * for `nested_record.level = 'INFO'` is represented as
+   * `GetStructField(AttributeReference("nested_record"), …)` — whose 
reference is the *struct*
+   * attribute `nested_record`, not the flat partition attribute 
`nested_record.level` — Spark
+   * classifies it as a data filter.  This means `partitionFilters` arrives 
here empty and
+   * `dataFilters` contains the nested-field predicate.  We re-split the 
combined set of filters
+   * below so that predicates whose only references are struct-parents of 
partition columns are
+   * treated as partition filters, matching the behaviour of 
[[HoodiePruneFileSourcePartitions]].
+   *
+   * @param partitionFilters partition column filters (may be incomplete for 
nested columns)
    * @param dataFilters      data columns filters
    * @return list of PartitionDirectory containing partition to base files 
mapping
    */
   override def listFiles(partitionFilters: Seq[Expression], dataFilters: 
Seq[Expression]): Seq[PartitionDirectory] = {
-    val slices = filterFileSlices(dataFilters, partitionFilters).flatMap(
+    val (actualPartitionFilters, actualDataFilters) =
+      reclassifyFiltersForNestedPartitionColumns(partitionFilters, dataFilters)

Review Comment:
   Reclassifying the filters looks more like a bandaid fix.  Could we fix the 
integration so that Spark properly classifies the partition vs data filters?



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/SparkHoodieTableFileIndex.scala:
##########
@@ -253,10 +262,26 @@ class SparkHoodieTableFileIndex(spark: SparkSession,
       //       the whole table
       if (haveProperPartitionValues(partitionPaths.toSeq) && 
partitionSchema.nonEmpty) {
         val predicate = partitionPruningPredicates.reduce(expressions.And)
+        val partitionFieldNames = partitionSchema.fieldNames
+        def getPartitionColumnPath(expr: Expression): Option[String] = expr 
match {
+          case a: AttributeReference =>
+            Some(a.name.replaceAll("#\\d+$", ""))
+          case GetStructField(child, _, Some(fieldName)) =>
+            getPartitionColumnPath(child).map(_ + "." + fieldName)
+          case _ => None
+        }
         val transformedPredicate = predicate.transform {
+          case g @ GetStructField(_, _, Some(_)) =>
+            getPartitionColumnPath(g).flatMap { path =>

Review Comment:
   The `partitionFieldNames.indexOf(path)` is always case-sensitive, but the 
`AttributeReference` branch below uses the session's `resolve` function to 
respect the Spark case-sensitivity setting. Could you use the resolver here too 
for consistency?



##########
hudi-spark-datasource/hudi-spark3-common/src/main/scala/org/apache/spark/sql/hudi/analysis/Spark3HoodiePruneFileSourcePartitions.scala:
##########
@@ -105,11 +105,31 @@ private object Spark3HoodiePruneFileSourcePartitions 
extends PredicateHelper {
     Project(projects, withFilter)
   }
 
+  /**
+   * Returns the logical name of an attribute by stripping Spark's internal 
exprId suffix (e.g. #136).
+   * Filter expressions may reference columns with these suffixed names (e.g. 
nested_record#136.level),
+   * while partition schema uses logical names (e.g. nested_record.level).
+   */
+  private def logicalAttributeName(attr: AttributeReference): String = {
+    attr.name.replaceAll("#\\d+$", "")
+  }
+
+  /**
+   * Returns true if the given attribute references a partition column. An 
attribute references a
+   * partition column if its logical name (without #exprId) equals a partition 
column name or
+   * is the struct parent of a nested partition path (e.g. nested_record for 
nested_record.level).
+   */
+  private def isPartitionColumnReference(attr: AttributeReference, 
partitionSchema: StructType): Boolean = {
+    val logicalName = logicalAttributeName(attr)
+    partitionSchema.names.contains(logicalName) ||
+      partitionSchema.names.exists(_.startsWith(logicalName + "."))
+  }
+
   def getPartitionFiltersAndDataFilters(partitionSchema: StructType,

Review Comment:
   Same concern as in `reclassifyFiltersForNestedPartitionColumns`: this 
`startsWith(logicalName + ".")` check will match 
`AttributeReference("nested_record")` from *any* filter on the `nested_record` 
struct, not just the partition field. A filter on 
`nested_record.some_other_field` would be incorrectly treated as a partition 
filter.



##########
hudi-spark-datasource/hudi-spark-common/src/main/scala/org/apache/hudi/HoodieFileIndex.scala:
##########
@@ -167,19 +167,49 @@ case class HoodieFileIndex(spark: SparkSession,
   /**
    * Invoked by Spark to fetch list of latest base files per partition.
    *
-   * @param partitionFilters partition column filters
+   * NOTE: For tables with nested partition columns (e.g. 
`nested_record.level`), Spark's
+   * [[org.apache.spark.sql.execution.datasources.FileSourceScanExec]] uses 
standard attribute-name
+   * matching when splitting filters into partition vs. data filters. Because 
the filter expression
+   * for `nested_record.level = 'INFO'` is represented as

Review Comment:
   How does Spark handles nested partition filters for parquet tables?  Does 
`FileSourceScanExec` properly classifies the partition vs data filters?  If so, 
should somewhere else in Hudi Spark integration be fixed, e.g., how the 
partition fields are provided?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to