[
https://issues.apache.org/jira/browse/SPARK-51831?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Wenchen Fan reassigned SPARK-51831:
-----------------------------------
Assignee: Junqing Li
> No Column Pruning while using ExistJoin and datasource v2
> ----------------------------------------------------------
>
> Key: SPARK-51831
> URL: https://issues.apache.org/jira/browse/SPARK-51831
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.5.5, 4.1.0, 4.0.0
> Reporter: Junqing Li
> Assignee: Junqing Li
> Priority: Major
> Labels: pull-request-available
>
> Recently, I have been testing TPC-DS queries based on DataSource V2, and
> noticed that column pruning does not occur in scenarios involving {{{}EXISTS
> (SELECT * FROM ... WHERE ...){}}}. As a result, the scan ends up reading all
> columns instead of just the required ones. This issue is reproducible in
> queries like Q10, Q16, Q35, Q69, and Q94.
> The following is a simplified example that demonstrates the problem.
> {code:java}
> test("Test exist join with v2 source plan") {
> import org.apache.spark.sql.functions._
> withTempPath { dir =>
> spark.range(100)
> .withColumn("col1", col("id") + 1)
> .withColumn("col2", col("id") + 2)
> .withColumn("col3", col("id") + 3)
> .withColumn("col4", col("id") + 4)
> .withColumn("col5", col("id") + 5)
> .withColumn("col6", col("id") + 6)
> .withColumn("col7", col("id") + 7)
> .withColumn("col8", col("id") + 8)
> .withColumn("col9", col("id") + 9)
> .write
> .mode("overwrite")
> .parquet(dir.getCanonicalPath + "/t1")
> spark.range(10).write.mode("overwrite").parquet(dir.getCanonicalPath +
> "/t2") Seq("parquet", "").foreach { v1SourceList =>
> withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key-> v1SourceList) {
> spark.read.parquet(dir.getCanonicalPath +
> "/t1").createOrReplaceTempView("t1")
> spark.read.parquet(dir.getCanonicalPath +
> "/t2").createOrReplaceTempView("t2")
> spark.sql(
> """
> |select sum(t1.id) as sum_id
> |from t1, t2
> |where t1.id == t2.id
> | and exists(select * from t1 where t1.id == t2.id)
> |""".stripMargin).explain()
> }
> }
> }
> } {code}
> After execution, we can observe the following:
> * {*}With DataSource V1{*}, the query plan clearly shows column pruning —
> only one column is read during the scan.
> *
> {code:java}
> BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
> true]),false), [plan_id=90]
> +- FileScan parquet [id#32L] Batched: true,
> DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0...,
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<id:bigint> {code}
> * {*}With DataSource V2{*}, the query plan reveals that no column pruning is
> applied — all columns are read from the underlying data source.
> *
> {code:java}
> BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint,
> true]),false), [plan_id=152]
> +- Project [id#58L]
> +- BatchScan parquet
> file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0ad680b-573c-4b11-b1b5-f50ee38fa81a/t1[id#58L,
> col1#59L, col2#60L, col3#61L, col4#62L, col5#63L, col6#64L, col7#65L,
> col8#66L, col9#67L] ParquetScan DataFilters: [], Format: parquet, Location:
> InMemoryFileIndex(1
> paths)[file:/private/var/folders/bb/4fvsn8r949d3kghh68lx3sqr0000gp/T/spark-e0...,
> PartitionFilters: [], PushedAggregation: [], PushedFilters: [],
> PushedGroupBy: [], ReadSchema:
> struct<id:bigint,col1:bigint,col2:bigint,col3:bigint,col4:bigint,col5:bigint,col6:bigint,col7:big...
> RuntimeFilters: []{code}
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]