This is an automated email from the ASF dual-hosted git repository.
zhztheplayer pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gluten.git
The following commit(s) were added to refs/heads/main by this push:
new bdaad2a047 [GLUTEN-10511][VL][Delta] Fix wrong result with partition
filters under column mapping (#12240)
bdaad2a047 is described below
commit bdaad2a047c91d98e98f610e64cb75ce80f0f6df
Author: EJ Song <[email protected]>
AuthorDate: Tue Jun 9 02:25:05 2026 -0700
[GLUTEN-10511][VL][Delta] Fix wrong result with partition filters under
column mapping (#12240)
Co-authored-by: Claude Opus 4.7 (1M context) <[email protected]>
---
.../gluten/execution/DeltaScanTransformer.scala | 31 ++-
.../gluten/extension/DeltaPostTransformRules.scala | 90 ++++++---
.../org/apache/gluten/execution/DeltaSuite.scala | 213 +++++++++++++++++++++
3 files changed, 305 insertions(+), 29 deletions(-)
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
index 1be03dd404..154b735554 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/execution/DeltaScanTransformer.scala
@@ -20,9 +20,10 @@ import org.apache.gluten.sql.shims.SparkShimLoader
import org.apache.gluten.substrait.rel.LocalFilesNode.ReadFileFormat
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
+import org.apache.spark.sql.catalyst.expressions.{Attribute,
AttributeReference, Expression}
import org.apache.spark.sql.catalyst.plans.QueryPlan
import org.apache.spark.sql.connector.read.streaming.SparkDataStream
+import org.apache.spark.sql.delta.{DeltaParquetFileFormat, NoMapping}
import org.apache.spark.sql.execution.FileSourceScanExec
import org.apache.spark.sql.execution.datasources.HadoopFsRelation
import org.apache.spark.sql.types.StructType
@@ -55,6 +56,34 @@ case class DeltaScanTransformer(
override lazy val fileFormat: ReadFileFormat =
ReadFileFormat.ParquetReadFormat
+ // For Delta column-mapping tables, `dataFilters` on the scan node are
LOGICAL-named so Delta's
+ // file index (`PreparedDeltaFileIndex.matchingFiles`,
`Snapshot.filesForScan`) can do partition
+ // pruning and stats-based file skipping -- both resolve filter attrs
against logical schemas.
+ //
+ // The native (Velox) side, however, must see PHYSICAL names: `output` and
`dataSchema` are
+ // physical (so the parquet reader finds the right column), and
`BasicScanExecTransformer`
+ // matches `scanFilters` against `pushDownFilters` (built from a `Filter`
that references the
+ // physical-named scan output) by `AttributeReference.equals`, which
compares names. Without
+ // this override, the logical-named `scanFilters` and physical-named
`pushDownFilters` would
+ // never match, causing duplicate filter evaluation in the substrait plan.
+ //
+ // Translate by exprId match against `output` rather than by re-running
Delta's column-mapping
+ // helpers; exprIds are stable across the post-transform rewrite and don't
require a second
+ // metadata lookup.
+ //
+ // See `DeltaPostTransformRules.transformColumnMappingPlan` for the full
picture of which
+ // fields stay logical vs. become physical, and the longer-term cleanup
direction (do all
+ // physical translation at substrait emission time so this override and the
alias-back
+ // ProjectExec both go away).
+ override def scanFilters: Seq[Expression] = relation.fileFormat match {
+ case d: DeltaParquetFileFormat if d.columnMappingMode != NoMapping =>
+ val physicalByExprId = output.collect { case ar: AttributeReference =>
ar.exprId -> ar }.toMap
+ dataFilters.map(_.transformDown {
+ case ar: AttributeReference => physicalByExprId.getOrElse(ar.exprId,
ar)
+ })
+ case _ => dataFilters
+ }
+
override protected def doValidateInternal(): ValidationResult = {
if (
requiredSchema.fields.exists(
diff --git
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
index e16a6d12fd..421db09732 100644
---
a/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
+++
b/gluten-delta/src/main/scala/org/apache/gluten/extension/DeltaPostTransformRules.scala
@@ -164,20 +164,55 @@ object DeltaPostTransformRules {
}
/**
- * This method is only used for Delta ColumnMapping FileFormat(e.g.
nameMapping and idMapping)
- * transform the metadata of Delta into Parquet's, each plan should only be
transformed once.
+ * Used for Delta ColumnMapping FileFormat (nameMapping and idMapping). Each
plan is transformed
+ * at most once; the first run is tagged so re-runs are no-ops.
+ *
+ * Background: with column mapping, Delta files are written with PHYSICAL
column names while
+ * Delta's metadata (partition schema, column stats) keeps LOGICAL names.
Vanilla Spark + Delta
+ * resolves this asymmetry inside
`DeltaParquetFileFormat.buildReaderWithPartitionValues`:
+ * everything on the scan node stays logical, and physical translation
happens just-in-time when
+ * handing data and filters to the parquet reader. Gluten bypasses that hook
(it goes to native
+ * via Substrait), so the translation has to live somewhere on our side.
+ *
+ * What this rule produces -- the parts that diverge from vanilla Spark are
commented at each
+ * site. The split-by-consumer is asymmetric on purpose:
+ *
+ * - `output`, `dataSchema`, and the data fields of `requiredSchema` ==>
PHYSICAL. These flow
+ * into the substrait `NamedStruct` that Velox uses to look up columns
in the parquet file.
+ * The parquet column name is the physical name, so Velox needs the
physical name on the
+ * schema side. A `ProjectExecTransformer` is added below to alias these
back to logical names
+ * for downstream Spark operators.
+ * - `partitionSchema`, `partitionFilters`, `dataFilters`, partition
fields of `requiredSchema`
+ * ==> LOGICAL. These are consumed by Delta's
`PreparedDeltaFileIndex.matchingFiles` and
+ * `Snapshot.filesForScan`, which resolve filters and partition values
against
+ * `metadata.partitionSchema` and the column-stats schema -- both
LOGICAL. Rewriting any of
+ * these to physical names was the cause of issue #10511 (partition
pruning silently no-op'd)
+ * and would also disable file-level stats skipping.
+ * - `DeltaScanTransformer.scanFilters` (override) ==> PHYSICAL,
translated from `dataFilters`
+ * by exprId match against `output`. Substrait binds filters by exprId
rather than name, so it
+ * would be tempting to pass logical-named filters straight through; but
+ * `BasicScanExecTransformer.filterExprs()` does a name-and-exprId
equality check
+ * (`scanFilters.partition(pushDownFilters.contains(_))`) against the
physical-named
+ * `pushDownFilters` from the upstream `Filter`. The override ensures
both sides match.
+ *
+ * Future cleanup (out of scope for this fix): the cleaner shape is to
mirror vanilla Spark
+ * exactly -- keep EVERYTHING on the scan node logical, and do physical
translation only at
+ * substrait emission time (e.g. inside the `NamedStruct`/`ReadRel` build in
+ * `BasicScanExecTransformer.doTransform`). That removes the alias-back
project below and the
+ * `scanFilters` override, but it requires plumbing Delta-specific
physical-name lookup into the
+ * substrait emitter and is a multi-module refactor.
*/
private def transformColumnMappingPlan(plan: SparkPlan): SparkPlan = plan
match {
case plan: DeltaScanTransformer =>
val fmt = plan.relation.fileFormat.asInstanceOf[DeltaParquetFileFormat]
- // transform HadoopFsRelation
val relation = plan.relation
+ val partitionColNames =
relation.partitionSchema.fields.iterator.map(_.name).toSet
+ def isPartitionCol(name: String): Boolean =
partitionColNames.contains(name)
+
+ // transform HadoopFsRelation: only `dataSchema` needs physical names
(those are the
+ // columns actually stored in parquet). `partitionSchema` stays logical.
val newFsRelation = relation.copy(
- partitionSchema = DeltaColumnMapping.createPhysicalSchema(
- relation.partitionSchema,
- fmt.referenceSchema,
- fmt.columnMappingMode),
dataSchema = DeltaColumnMapping.createPhysicalSchema(
relation.dataSchema,
fmt.referenceSchema,
@@ -193,6 +228,8 @@ object DeltaPostTransformRules {
attr
} else if (isInputFileRelatedAttribute(attr)) {
attr
+ } else if (isPartitionCol(attr.name)) {
+ attr
} else {
DeltaColumnMapping
.createPhysicalAttributes(Seq(attr), fmt.referenceSchema,
fmt.columnMappingMode)
@@ -204,31 +241,28 @@ object DeltaPostTransformRules {
newAttr
}
val newOutput = plan.output.map(o => mapAttribute(o))
- // transform dataFilters
- val newDataFilters = plan.dataFilters.map {
- e =>
- e.transformDown {
- case attr: AttributeReference =>
- mapAttribute(attr)
- }
- }
- // transform partitionFilters
- val newPartitionFilters = plan.partitionFilters.map {
- e =>
- e.transformDown {
- case attr: AttributeReference =>
- mapAttribute(attr)
- }
- }
- // replace tableName in schema with physicalName
+ // dataFilters / partitionFilters: kept LOGICAL on the scan node so
Delta's file index
+ // (partition pruning + stats-based file skipping) resolves columns
correctly. The native
+ // (Velox) side gets physical-translated copies via
`DeltaScanTransformer.scanFilters`.
+ val newDataFilters = plan.dataFilters
+ val newPartitionFilters = plan.partitionFilters
+
+ // requiredSchema: rewrite data fields to physical, keep partition
fields logical.
+ val physicalRequiredSchema = DeltaColumnMapping.createPhysicalSchema(
+ plan.requiredSchema,
+ fmt.referenceSchema,
+ fmt.columnMappingMode)
+ val newRequiredSchema = StructType(
+ physicalRequiredSchema.fields.zip(plan.requiredSchema.fields).map {
+ case (_, logical) if isPartitionCol(logical.name) => logical
+ case (physical, _) => physical
+ })
+
val scanExecTransformer = new DeltaScanTransformer(
newFsRelation,
plan.stream,
newOutput,
- DeltaColumnMapping.createPhysicalSchema(
- plan.requiredSchema,
- fmt.referenceSchema,
- fmt.columnMappingMode),
+ newRequiredSchema,
newPartitionFilters,
plan.optionalBucketSet,
plan.optionalNumCoalescedBuckets,
diff --git
a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
index 031bf46034..1f95ec7dac 100644
--- a/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
+++ b/gluten-delta/src/test/scala/org/apache/gluten/execution/DeltaSuite.scala
@@ -82,6 +82,219 @@ abstract class DeltaSuite extends
WholeStageTransformerSuite {
}
}
+ // Counts files Delta will read for `df`. Driven by
`PreparedDeltaFileIndex.inputFiles`, which
+ // is the post-pruning, post-stats-skipping file set computed by
`PrepareDeltaScan`. Useful for
+ // asserting that Delta's file index actually pruned, regardless of what
Gluten does later.
+ private def deltaInputFileCount(df: org.apache.spark.sql.DataFrame): Int =
+ df.inputFiles.length
+
+ // Counts the partition directories selected by the executed scan after
Gluten's rewrite.
+ // Backed by `selectedPartitions` ->
`relation.location.listFiles(partitionFilters, dataFilters)`,
+ // which is the exact call site of issue #10511: pre-fix, physical-named
partition filters
+ // could not match Delta's logical partition schema, so this returned all
directories. We use
+ // `getPartitionArray` rather than `getPartitions` because the latter
reflects post-coalesce
+ // splits (Velox may merge small files into one split, hiding the
per-partition count).
+ private def selectedPartitionCount(df: org.apache.spark.sql.DataFrame): Int
= {
+ val scan = df.queryExecution.executedPlan.collect {
+ case f: DeltaScanTransformer => f
+ }.head
+ scan.getPartitionArray.length
+ }
+
+ // Regression for issue #10511: with column mapping, a partition column
filter must prune
+ // partitions correctly. Pre-fix, Gluten rewrote partition filters to
physical names, which
+ // broke `PreparedDeltaFileIndex.matchingFiles` and silently returned all
files.
+ Seq("name", "id").foreach {
+ mode =>
+ testWithMinSparkVersion(
+ s"column mapping mode = $mode with partition filter (single partition
col)",
+ "3.2") {
+ withTable("delta_cm_part") {
+ spark.sql(s"""
+ |create table delta_cm_part (id int, name string) using
delta
+ |partitioned by (id)
+ |tblproperties ("delta.columnMapping.mode" = "$mode")
+ |""".stripMargin)
+ // Use multiple inserts so each value lands in its own partition
directory & file.
+ spark.sql("insert into delta_cm_part values (1, \"v1\")")
+ spark.sql("insert into delta_cm_part values (2, \"v2\")")
+ spark.sql("insert into delta_cm_part values (3, \"v3\")")
+
+ // Equality on partition column. 1 of 3 partitions matches.
+ val df1 = runQueryAndCompare("select name from delta_cm_part where
id = 2") { _ => }
+ checkLengthAndPlan(df1, 1)
+ checkAnswer(df1, Row("v2") :: Nil)
+ assert(deltaInputFileCount(df1) == 1, "Delta should prune to 1 file")
+ assert(selectedPartitionCount(df1) == 1, "native scan should see 1
split")
+
+ // Range on partition column (the exact case from the bug report).
+ val df2 = runQueryAndCompare("select name from delta_cm_part where
id > 2") { _ => }
+ checkLengthAndPlan(df2, 1)
+ checkAnswer(df2, Row("v3") :: Nil)
+ assert(deltaInputFileCount(df2) == 1)
+ assert(selectedPartitionCount(df2) == 1)
+
+ // IN list on partition column. 2 of 3 partitions match.
+ val df3 =
+ runQueryAndCompare("select name from delta_cm_part where id in (1,
3)") { _ => }
+ checkLengthAndPlan(df3, 2)
+ checkAnswer(df3, Row("v1") :: Row("v3") :: Nil)
+ assert(deltaInputFileCount(df3) == 2)
+ assert(selectedPartitionCount(df3) == 2)
+
+ // No filter -- baseline: all 3 partitions read.
+ val dfAll = runQueryAndCompare("select name from delta_cm_part") { _
=> }
+ assert(deltaInputFileCount(dfAll) == 3)
+ assert(selectedPartitionCount(dfAll) == 3)
+ }
+ }
+
+ testWithMinSparkVersion(
+ s"column mapping mode = $mode with partition filter (multi partition
col)",
+ "3.2") {
+ withTable("delta_cm_part_multi") {
+ spark.sql(s"""
+ |create table delta_cm_part_multi
+ | (id int, region string, name string)
+ |using delta partitioned by (region, id)
+ |tblproperties ("delta.columnMapping.mode" = "$mode")
+ |""".stripMargin)
+ spark.sql("insert into delta_cm_part_multi values (1, \"us\",
\"v1\")")
+ spark.sql("insert into delta_cm_part_multi values (2, \"us\",
\"v2\")")
+ spark.sql("insert into delta_cm_part_multi values (1, \"eu\",
\"v3\")")
+ spark.sql("insert into delta_cm_part_multi values (2, \"eu\",
\"v4\")")
+
+ val df = runQueryAndCompare(
+ "select name from delta_cm_part_multi where region = 'us' and id >
1") { _ => }
+ checkLengthAndPlan(df, 1)
+ checkAnswer(df, Row("v2") :: Nil)
+ assert(deltaInputFileCount(df) == 1, "Delta should prune to 1 file
with both filters")
+ assert(selectedPartitionCount(df) == 1)
+
+ // Filter on only one of two partition columns.
+ val df2 = runQueryAndCompare(
+ "select name from delta_cm_part_multi where region = 'eu'") { _ =>
}
+ checkLengthAndPlan(df2, 2)
+ checkAnswer(df2, Row("v3") :: Row("v4") :: Nil)
+ assert(deltaInputFileCount(df2) == 2)
+ assert(selectedPartitionCount(df2) == 2)
+ }
+ }
+
+ testWithMinSparkVersion(
+ s"column mapping mode = $mode with partition + data filter",
+ "3.2") {
+ withTable("delta_cm_part_data") {
+ spark.sql(s"""
+ |create table delta_cm_part_data (id int, name string,
age int)
+ |using delta partitioned by (id)
+ |tblproperties ("delta.columnMapping.mode" = "$mode")
+ |""".stripMargin)
+ spark.sql("insert into delta_cm_part_data values (1, \"a\", 10), (1,
\"b\", 20)")
+ spark.sql("insert into delta_cm_part_data values (2, \"c\", 30), (2,
\"d\", 40)")
+ spark.sql("insert into delta_cm_part_data values (3, \"e\", 50), (3,
\"f\", 60)")
+
+ // Combined: partition pruning to id > 1 keeps 2 files; data
stats-skipping on age >= 50
+ // further drops the id=2 file (max age 40 < 50). Should leave 1
file.
+ val df1 = runQueryAndCompare(
+ "select name from delta_cm_part_data where id > 1 and age >= 50")
{ _ => }
+ checkLengthAndPlan(df1, 2)
+ checkAnswer(df1, Row("e") :: Row("f") :: Nil)
+ assert(
+ deltaInputFileCount(df1) == 1,
+ "partition + stats-skipping should leave 1 file out of 3")
+
+ // Data filter alone -- file-level stats skipping should resolve
column names.
+ // Only the id=2 file (age 30..40) matches age = 30.
+ val df2 = runQueryAndCompare(
+ "select name from delta_cm_part_data where age = 30") { _ => }
+ checkLengthAndPlan(df2, 1)
+ checkAnswer(df2, Row("c") :: Nil)
+ assert(
+ deltaInputFileCount(df2) == 1,
+ "stats-based file skipping should leave 1 file out of 3")
+ }
+ }
+
+ testWithMinSparkVersion(
+ s"column mapping mode = $mode with IS [NOT] NULL on partition col",
+ "3.2") {
+ withTable("delta_cm_part_null") {
+ spark.sql(s"""
+ |create table delta_cm_part_null (id int, name string)
+ |using delta partitioned by (id)
+ |tblproperties ("delta.columnMapping.mode" = "$mode")
+ |""".stripMargin)
+ spark.sql("insert into delta_cm_part_null values (1, \"v1\")")
+ spark.sql("insert into delta_cm_part_null values (2, \"v2\")")
+ spark.sql("insert into delta_cm_part_null values (cast(null as int),
\"vn\")")
+
+ val df1 = runQueryAndCompare(
+ "select name from delta_cm_part_null where id is null") { _ => }
+ checkAnswer(df1, Row("vn") :: Nil)
+ assert(deltaInputFileCount(df1) == 1)
+ assert(selectedPartitionCount(df1) == 1)
+
+ val df2 = runQueryAndCompare(
+ "select name from delta_cm_part_null where id is not null") { _ =>
}
+ checkAnswer(df2, Row("v1") :: Row("v2") :: Nil)
+ assert(deltaInputFileCount(df2) == 2)
+ assert(selectedPartitionCount(df2) == 2)
+ }
+ }
+
+ testWithMinSparkVersion(
+ s"column mapping mode = $mode partition filter survives column rename",
+ "3.2") {
+ withTable("delta_cm_part_rename") {
+ spark.sql(s"""
+ |create table delta_cm_part_rename (id int, name string)
+ |using delta partitioned by (id)
+ |tblproperties ("delta.columnMapping.mode" = "$mode")
+ |""".stripMargin)
+ spark.sql("insert into delta_cm_part_rename values (1, \"v1\")")
+ spark.sql("insert into delta_cm_part_rename values (2, \"v2\")")
+ spark.sql("insert into delta_cm_part_rename values (3, \"v3\")")
+ // Rename the partition column. The physical name in storage stays
the same; only the
+ // logical name changes, so the logical-name-based partition filter
must still resolve.
+ spark.sql("alter table delta_cm_part_rename rename column id to pid")
+
+ val df = runQueryAndCompare(
+ "select name from delta_cm_part_rename where pid >= 2") { _ => }
+ checkLengthAndPlan(df, 2)
+ checkAnswer(df, Row("v2") :: Row("v3") :: Nil)
+ assert(deltaInputFileCount(df) == 2)
+ assert(selectedPartitionCount(df) == 2)
+ }
+ }
+
+ testWithMinSparkVersion(
+ s"column mapping mode = $mode data column rename + filter (file
skipping)",
+ "3.2") {
+ withTable("delta_cm_data_rename") {
+ spark.sql(s"""
+ |create table delta_cm_data_rename (id int, age int,
name string)
+ |using delta
+ |tblproperties ("delta.columnMapping.mode" = "$mode")
+ |""".stripMargin)
+ spark.sql("insert into delta_cm_data_rename values (1, 10, \"a\")")
+ spark.sql("insert into delta_cm_data_rename values (2, 20, \"b\")")
+ spark.sql("insert into delta_cm_data_rename values (3, 30, \"c\")")
+ // Rename a data column. Filter pushdown must still match physical
column in parquet,
+ // and Delta's stats-based skipping must still resolve the logical
name `years`.
+ spark.sql("alter table delta_cm_data_rename rename column age to
years")
+
+ val df = runQueryAndCompare(
+ "select name from delta_cm_data_rename where years = 20") { _ => }
+ checkLengthAndPlan(df, 1)
+ checkAnswer(df, Row("b") :: Nil)
+ assert(
+ deltaInputFileCount(df) == 1,
+ "stats skipping on renamed data column should leave 1 file")
+ }
+ }
+ }
+
test("delta: time travel") {
withTable("delta_tm") {
spark.sql(s"""
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]