This is an automated email from the ASF dual-hosted git repository.

zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git


The following commit(s) were added to refs/heads/main by this push:
     new bdaad2a047 [GLUTEN-10511][VL][Delta] Fix wrong result with partition 
filters under column mapping (#12240)
bdaad2a047 is described below

commit bdaad2a047c91d98e98f610e64cb75ce80f0f6df
Author: EJ Song <[email protected]>
AuthorDate: Tue Jun 9 02:25:05 2026 -0700

    [GLUTEN-10511][VL][Delta] Fix wrong result with partition filters under 
column mapping (#12240)
    
    Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
 .../gluten/execution/DeltaScanTransformer.scala    |  31 ++-
 .../gluten/extension/DeltaPostTransformRules.scala |  90 ++++++---
 .../org/apache/gluten/execution/DeltaSuite.scala   | 213 +++++++++++++++++++++
 3 files changed, 305 insertions(+), 29 deletions(-)

diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index 1be03dd404..154b735554 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -20,9 +20,10 @@ import org.apache.gluten.sql.shims.SparkShimLoader
 import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, 
AttributeReference, Expression}
 import org.apache.spark.sql.catalyst.plans.QueryPlan
 import org.apache.spark.sql.connector.read.streaming.SparkDataStream
+import org.apache.spark.sql.delta.{DeltaParquetFileFormat, NoMapping}
 import org.apache.spark.sql.execution.FileSourceScanExec
 import org.apache.spark.sql.execution.datasources.HadoopFsRelation
 import org.apache.spark.sql.types.StructType
@@ -55,6 +56,34 @@ case class DeltaScanTransformer(
 
   override lazy val fileFormat: ReadFileFormat = 
ReadFileFormat.ParquetReadFormat
 
+  // For Delta column-mapping tables, `dataFilters` on the scan node are 
LOGICAL-named so Delta's
+  // file index (`PreparedDeltaFileIndex.matchingFiles`, 
`Snapshot.filesForScan`) can do partition
+  // pruning and stats-based file skipping -- both resolve filter attrs 
against logical schemas.
+  //
+  // The native (Velox) side, however, must see PHYSICAL names: `output` and 
`dataSchema` are
+  // physical (so the parquet reader finds the right column), and 
`BasicScanExecTransformer`
+  // matches `scanFilters` against `pushDownFilters` (built from a `Filter` 
that references the
+  // physical-named scan output) by `AttributeReference.equals`, which 
compares names. Without
+  // this override, the logical-named `scanFilters` and physical-named 
`pushDownFilters` would
+  // never match, causing duplicate filter evaluation in the substrait plan.
+  //
+  // Translate by exprId match against `output` rather than by re-running 
Delta's column-mapping
+  // helpers; exprIds are stable across the post-transform rewrite and don't 
require a second
+  // metadata lookup.
+  //
+  // See `DeltaPostTransformRules.transformColumnMappingPlan` for the full 
picture of which
+  // fields stay logical vs. become physical, and the longer-term cleanup 
direction (do all
+  // physical translation at substrait emission time so this override and the 
alias-back
+  // ProjectExec both go away).
+  override def scanFilters: Seq[Expression] = relation.fileFormat match {
+    case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
+      val physicalByExprId = output.collect { case ar: AttributeReference => 
ar.exprId -> ar }.toMap
+      dataFilters.map(_.transformDown {
+        case ar: AttributeReference => physicalByExprId.getOrElse(ar.exprId, 
ar)
+      })
+    case _ => dataFilters
+  }
+
   override protected def doValidateInternal(): ValidationResult = {
     if (
       requiredSchema.fields.exists(
diff --git 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index e16a6d12fd..421db09732 100644
--- 
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
+++ 
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -164,20 +164,55 @@ object DeltaPostTransformRules {
   }
 
   /**
-   * This method is only used for Delta ColumnMapping FileFormat(e.g. 
nameMapping and idMapping)
-   * transform the metadata of Delta into Parquet's, each plan should only be 
transformed once.
+   * Used for Delta ColumnMapping FileFormat (nameMapping and idMapping). Each 
plan is transformed
+   * at most once; the first run is tagged so re-runs are no-ops.
+   *
+   * Background: with column mapping, Delta files are written with PHYSICAL 
column names while
+   * Delta's metadata (partition schema, column stats) keeps LOGICAL names. 
Vanilla Spark + Delta
+   * resolves this asymmetry inside 
`DeltaParquetFileFormat.buildReaderWithPartitionValues`:
+   * everything on the scan node stays logical, and physical translation 
happens just-in-time when
+   * handing data and filters to the parquet reader. Gluten bypasses that hook 
(it goes to native
+   * via Substrait), so the translation has to live somewhere on our side.
+   *
+   * What this rule produces -- the parts that diverge from vanilla Spark are 
commented at each
+   * site. The split-by-consumer is asymmetric on purpose:
+   *
+   *   - `output`, `dataSchema`, and the data fields of `requiredSchema` ==> 
PHYSICAL. These flow
+   *     into the substrait `NamedStruct` that Velox uses to look up columns 
in the parquet file.
+   *     The parquet column name is the physical name, so Velox needs the 
physical name on the
+   *     schema side. A `ProjectExecTransformer` is added below to alias these 
back to logical names
+   *     for downstream Spark operators.
+   *   - `partitionSchema`, `partitionFilters`, `dataFilters`, partition 
fields of `requiredSchema`
+   *     ==> LOGICAL. These are consumed by Delta's 
`PreparedDeltaFileIndex.matchingFiles` and
+   *     `Snapshot.filesForScan`, which resolve filters and partition values 
against
+   *     `metadata.partitionSchema` and the column-stats schema -- both 
LOGICAL. Rewriting any of
+   *     these to physical names was the cause of issue #10511 (partition 
pruning silently no-op'd)
+   *     and would also disable file-level stats skipping.
+   *   - `DeltaScanTransformer.scanFilters` (override) ==> PHYSICAL, 
translated from `dataFilters`
+   *     by exprId match against `output`. Substrait binds filters by exprId 
rather than name, so it
+   *     would be tempting to pass logical-named filters straight through; but
+   *     `BasicScanExecTransformer.filterExprs()` does a name-and-exprId 
equality check
+   *     (`scanFilters.partition(pushDownFilters.contains(_))`) against the 
physical-named
+   *     `pushDownFilters` from the upstream `Filter`. The override ensures 
both sides match.
+   *
+   * Future cleanup (out of scope for this fix): the cleaner shape is to 
mirror vanilla Spark
+   * exactly -- keep EVERYTHING on the scan node logical, and do physical 
translation only at
+   * substrait emission time (e.g. inside the `NamedStruct`/`ReadRel` build in
+   * `BasicScanExecTransformer.doTransform`). That removes the alias-back 
project below and the
+   * `scanFilters` override, but it requires plumbing Delta-specific 
physical-name lookup into the
+   * substrait emitter and is a multi-module refactor.
    */
   private def transformColumnMappingPlan(plan: SparkPlan): SparkPlan = plan 
match {
     case plan: DeltaScanTransformer =>
       val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat]
 
-      // transform HadoopFsRelation
       val relation = plan.relation
+      val partitionColNames = 
relation.partitionSchema.fields.iterator.map(_.name).toSet
+      def isPartitionCol(name: String): Boolean = 
partitionColNames.contains(name)
+
+      // transform HadoopFsRelation: only `dataSchema` needs physical names 
(those are the
+      // columns actually stored in parquet). `partitionSchema` stays logical.
       val newFsRelation = relation.copy(
-        partitionSchema = DeltaColumnMapping.createPhysicalSchema(
-          relation.partitionSchema,
-          fmt.referenceSchema,
-          fmt.columnMappingMode),
         dataSchema = DeltaColumnMapping.createPhysicalSchema(
           relation.dataSchema,
           fmt.referenceSchema,
@@ -193,6 +228,8 @@ object DeltaPostTransformRules {
           attr
         } else if (isInputFileRelatedAttribute(attr)) {
           attr
+        } else if (isPartitionCol(attr.name)) {
+          attr
         } else {
           DeltaColumnMapping
             .createPhysicalAttributes(Seq(attr), fmt.referenceSchema, 
fmt.columnMappingMode)
@@ -204,31 +241,28 @@ object DeltaPostTransformRules {
         newAttr
       }
       val newOutput = plan.output.map(o => mapAttribute(o))
-      // transform dataFilters
-      val newDataFilters = plan.dataFilters.map {
-        e =>
-          e.transformDown {
-            case attr: AttributeReference =>
-              mapAttribute(attr)
-          }
-      }
-      // transform partitionFilters
-      val newPartitionFilters = plan.partitionFilters.map {
-        e =>
-          e.transformDown {
-            case attr: AttributeReference =>
-              mapAttribute(attr)
-          }
-      }
-      // replace tableName in schema with physicalName
+      // dataFilters / partitionFilters: kept LOGICAL on the scan node so 
Delta's file index
+      // (partition pruning + stats-based file skipping) resolves columns 
correctly. The native
+      // (Velox) side gets physical-translated copies via 
`DeltaScanTransformer.scanFilters`.
+      val newDataFilters = plan.dataFilters
+      val newPartitionFilters = plan.partitionFilters
+
+      // requiredSchema: rewrite data fields to physical, keep partition 
fields logical.
+      val physicalRequiredSchema = DeltaColumnMapping.createPhysicalSchema(
+        plan.requiredSchema,
+        fmt.referenceSchema,
+        fmt.columnMappingMode)
+      val newRequiredSchema = StructType(
+        physicalRequiredSchema.fields.zip(plan.requiredSchema.fields).map {
+          case (_, logical) if isPartitionCol(logical.name) => logical
+          case (physical, _) => physical
+        })
+
       val scanExecTransformer = new DeltaScanTransformer(
         newFsRelation,
         plan.stream,
         newOutput,
-        DeltaColumnMapping.createPhysicalSchema(
-          plan.requiredSchema,
-          fmt.referenceSchema,
-          fmt.columnMappingMode),
+        newRequiredSchema,
         newPartitionFilters,
         plan.optionalBucketSet,
         plan.optionalNumCoalescedBuckets,
diff --git 
a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala 
b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
index 031bf46034..1f95ec7dac 100644
--- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
+++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
@@ -82,6 +82,219 @@ abstract class DeltaSuite extends 
WholeStageTransformerSuite {
     }
   }
 
+  // Counts files Delta will read for `df`. Driven by 
`PreparedDeltaFileIndex.inputFiles`, which
+  // is the post-pruning, post-stats-skipping file set computed by 
`PrepareDeltaScan`. Useful for
+  // asserting that Delta's file index actually pruned, regardless of what 
Gluten does later.
+  private def deltaInputFileCount(df: org.apache.spark.sql.DataFrame): Int =
+    df.inputFiles.length
+
+  // Counts the partition directories selected by the executed scan after 
Gluten's rewrite.
+  // Backed by `selectedPartitions` -> 
`relation.location.listFiles(partitionFilters, dataFilters)`,
+  // which is the exact call site of issue #10511: pre-fix, physical-named 
partition filters
+  // could not match Delta's logical partition schema, so this returned all 
directories. We use
+  // `getPartitionArray` rather than `getPartitions` because the latter 
reflects post-coalesce
+  // splits (Velox may merge small files into one split, hiding the 
per-partition count).
+  private def selectedPartitionCount(df: org.apache.spark.sql.DataFrame): Int 
= {
+    val scan = df.queryExecution.executedPlan.collect {
+      case f: DeltaScanTransformer => f
+    }.head
+    scan.getPartitionArray.length
+  }
+
+  // Regression for issue #10511: with column mapping, a partition column 
filter must prune
+  // partitions correctly. Pre-fix, Gluten rewrote partition filters to 
physical names, which
+  // broke `PreparedDeltaFileIndex.matchingFiles` and silently returned all 
files.
+  Seq("name", "id").foreach {
+    mode =>
+      testWithMinSparkVersion(
+        s"column mapping mode = $mode with partition filter (single partition 
col)",
+        "3.2") {
+        withTable("delta_cm_part") {
+          spark.sql(s"""
+                       |create table delta_cm_part (id int, name string) using 
delta
+                       |partitioned by (id)
+                       |tblproperties ("delta.columnMapping.mode" = "$mode")
+                       |""".stripMargin)
+          // Use multiple inserts so each value lands in its own partition 
directory & file.
+          spark.sql("insert into delta_cm_part values (1, \"v1\")")
+          spark.sql("insert into delta_cm_part values (2, \"v2\")")
+          spark.sql("insert into delta_cm_part values (3, \"v3\")")
+
+          // Equality on partition column. 1 of 3 partitions matches.
+          val df1 = runQueryAndCompare("select name from delta_cm_part where 
id = 2") { _ => }
+          checkLengthAndPlan(df1, 1)
+          checkAnswer(df1, Row("v2") :: Nil)
+          assert(deltaInputFileCount(df1) == 1, "Delta should prune to 1 file")
+          assert(selectedPartitionCount(df1) == 1, "native scan should see 1 
split")
+
+          // Range on partition column (the exact case from the bug report).
+          val df2 = runQueryAndCompare("select name from delta_cm_part where 
id > 2") { _ => }
+          checkLengthAndPlan(df2, 1)
+          checkAnswer(df2, Row("v3") :: Nil)
+          assert(deltaInputFileCount(df2) == 1)
+          assert(selectedPartitionCount(df2) == 1)
+
+          // IN list on partition column. 2 of 3 partitions match.
+          val df3 =
+            runQueryAndCompare("select name from delta_cm_part where id in (1, 
3)") { _ => }
+          checkLengthAndPlan(df3, 2)
+          checkAnswer(df3, Row("v1") :: Row("v3") :: Nil)
+          assert(deltaInputFileCount(df3) == 2)
+          assert(selectedPartitionCount(df3) == 2)
+
+          // No filter -- baseline: all 3 partitions read.
+          val dfAll = runQueryAndCompare("select name from delta_cm_part") { _ 
=> }
+          assert(deltaInputFileCount(dfAll) == 3)
+          assert(selectedPartitionCount(dfAll) == 3)
+        }
+      }
+
+      testWithMinSparkVersion(
+        s"column mapping mode = $mode with partition filter (multi partition 
col)",
+        "3.2") {
+        withTable("delta_cm_part_multi") {
+          spark.sql(s"""
+                       |create table delta_cm_part_multi
+                       |  (id int, region string, name string)
+                       |using delta partitioned by (region, id)
+                       |tblproperties ("delta.columnMapping.mode" = "$mode")
+                       |""".stripMargin)
+          spark.sql("insert into delta_cm_part_multi values (1, \"us\", 
\"v1\")")
+          spark.sql("insert into delta_cm_part_multi values (2, \"us\", 
\"v2\")")
+          spark.sql("insert into delta_cm_part_multi values (1, \"eu\", 
\"v3\")")
+          spark.sql("insert into delta_cm_part_multi values (2, \"eu\", 
\"v4\")")
+
+          val df = runQueryAndCompare(
+            "select name from delta_cm_part_multi where region = 'us' and id > 
1") { _ => }
+          checkLengthAndPlan(df, 1)
+          checkAnswer(df, Row("v2") :: Nil)
+          assert(deltaInputFileCount(df) == 1, "Delta should prune to 1 file 
with both filters")
+          assert(selectedPartitionCount(df) == 1)
+
+          // Filter on only one of two partition columns.
+          val df2 = runQueryAndCompare(
+            "select name from delta_cm_part_multi where region = 'eu'") { _ => 
}
+          checkLengthAndPlan(df2, 2)
+          checkAnswer(df2, Row("v3") :: Row("v4") :: Nil)
+          assert(deltaInputFileCount(df2) == 2)
+          assert(selectedPartitionCount(df2) == 2)
+        }
+      }
+
+      testWithMinSparkVersion(
+        s"column mapping mode = $mode with partition + data filter",
+        "3.2") {
+        withTable("delta_cm_part_data") {
+          spark.sql(s"""
+                       |create table delta_cm_part_data (id int, name string, 
age int)
+                       |using delta partitioned by (id)
+                       |tblproperties ("delta.columnMapping.mode" = "$mode")
+                       |""".stripMargin)
+          spark.sql("insert into delta_cm_part_data values (1, \"a\", 10), (1, 
\"b\", 20)")
+          spark.sql("insert into delta_cm_part_data values (2, \"c\", 30), (2, 
\"d\", 40)")
+          spark.sql("insert into delta_cm_part_data values (3, \"e\", 50), (3, 
\"f\", 60)")
+
+          // Combined: partition pruning to id > 1 keeps 2 files; data 
stats-skipping on age >= 50
+          // further drops the id=2 file (max age 40 < 50). Should leave 1 
file.
+          val df1 = runQueryAndCompare(
+            "select name from delta_cm_part_data where id > 1 and age >= 50") 
{ _ => }
+          checkLengthAndPlan(df1, 2)
+          checkAnswer(df1, Row("e") :: Row("f") :: Nil)
+          assert(
+            deltaInputFileCount(df1) == 1,
+            "partition + stats-skipping should leave 1 file out of 3")
+
+          // Data filter alone -- file-level stats skipping should resolve 
column names.
+          // Only the id=2 file (age 30..40) matches age = 30.
+          val df2 = runQueryAndCompare(
+            "select name from delta_cm_part_data where age = 30") { _ => }
+          checkLengthAndPlan(df2, 1)
+          checkAnswer(df2, Row("c") :: Nil)
+          assert(
+            deltaInputFileCount(df2) == 1,
+            "stats-based file skipping should leave 1 file out of 3")
+        }
+      }
+
+      testWithMinSparkVersion(
+        s"column mapping mode = $mode with IS [NOT] NULL on partition col",
+        "3.2") {
+        withTable("delta_cm_part_null") {
+          spark.sql(s"""
+                       |create table delta_cm_part_null (id int, name string)
+                       |using delta partitioned by (id)
+                       |tblproperties ("delta.columnMapping.mode" = "$mode")
+                       |""".stripMargin)
+          spark.sql("insert into delta_cm_part_null values (1, \"v1\")")
+          spark.sql("insert into delta_cm_part_null values (2, \"v2\")")
+          spark.sql("insert into delta_cm_part_null values (cast(null as int), 
\"vn\")")
+
+          val df1 = runQueryAndCompare(
+            "select name from delta_cm_part_null where id is null") { _ => }
+          checkAnswer(df1, Row("vn") :: Nil)
+          assert(deltaInputFileCount(df1) == 1)
+          assert(selectedPartitionCount(df1) == 1)
+
+          val df2 = runQueryAndCompare(
+            "select name from delta_cm_part_null where id is not null") { _ => 
}
+          checkAnswer(df2, Row("v1") :: Row("v2") :: Nil)
+          assert(deltaInputFileCount(df2) == 2)
+          assert(selectedPartitionCount(df2) == 2)
+        }
+      }
+
+      testWithMinSparkVersion(
+        s"column mapping mode = $mode partition filter survives column rename",
+        "3.2") {
+        withTable("delta_cm_part_rename") {
+          spark.sql(s"""
+                       |create table delta_cm_part_rename (id int, name string)
+                       |using delta partitioned by (id)
+                       |tblproperties ("delta.columnMapping.mode" = "$mode")
+                       |""".stripMargin)
+          spark.sql("insert into delta_cm_part_rename values (1, \"v1\")")
+          spark.sql("insert into delta_cm_part_rename values (2, \"v2\")")
+          spark.sql("insert into delta_cm_part_rename values (3, \"v3\")")
+          // Rename the partition column. The physical name in storage stays 
the same; only the
+          // logical name changes, so the logical-name-based partition filter 
must still resolve.
+          spark.sql("alter table delta_cm_part_rename rename column id to pid")
+
+          val df = runQueryAndCompare(
+            "select name from delta_cm_part_rename where pid >= 2") { _ => }
+          checkLengthAndPlan(df, 2)
+          checkAnswer(df, Row("v2") :: Row("v3") :: Nil)
+          assert(deltaInputFileCount(df) == 2)
+          assert(selectedPartitionCount(df) == 2)
+        }
+      }
+
+      testWithMinSparkVersion(
+        s"column mapping mode = $mode data column rename + filter (file 
skipping)",
+        "3.2") {
+        withTable("delta_cm_data_rename") {
+          spark.sql(s"""
+                       |create table delta_cm_data_rename (id int, age int, 
name string)
+                       |using delta
+                       |tblproperties ("delta.columnMapping.mode" = "$mode")
+                       |""".stripMargin)
+          spark.sql("insert into delta_cm_data_rename values (1, 10, \"a\")")
+          spark.sql("insert into delta_cm_data_rename values (2, 20, \"b\")")
+          spark.sql("insert into delta_cm_data_rename values (3, 30, \"c\")")
+          // Rename a data column. Filter pushdown must still match physical 
column in parquet,
+          // and Delta's stats-based skipping must still resolve the logical 
name `years`.
+          spark.sql("alter table delta_cm_data_rename rename column age to 
years")
+
+          val df = runQueryAndCompare(
+            "select name from delta_cm_data_rename where years = 20") { _ => }
+          checkLengthAndPlan(df, 1)
+          checkAnswer(df, Row("b") :: Nil)
+          assert(
+            deltaInputFileCount(df) == 1,
+            "stats skipping on renamed data column should leave 1 file")
+        }
+      }
+  }
+
   test("delta: time travel") {
     withTable("delta_tm") {
       spark.sql(s"""


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to