Mingjia Liu created SPARK-32609: ----------------------------------- Summary: Incorrect exchange reuse with DataSourceV2 Key: SPARK-32609 URL: https://issues.apache.org/jira/browse/SPARK-32609 Project: Spark Issue Type: Bug Components: SQL Affects Versions: 2.4.5 Reporter: Mingjia Liu
{code:java} spark.conf.set("spark.sql.exchange.reuse","true") spark.read.format("com.google.cloud.spark.bigquery.v2.BigQueryDataSourceV2").option("table",'tpcds_1G.date_dim').load() df.createOrReplaceTempView(table) df = spark.sql(""" WITH t1 AS ( SELECT d_year, d_month_seq FROM ( SELECT t1.d_year , t2.d_month_seq FROM date_dim t1 cross join date_dim t2 where t1.d_day_name = "Monday" and t1.d_fy_year > 2000 and t2.d_day_name = "Monday" and t2.d_fy_year > 2000 ) GROUP BY d_year, d_month_seq) SELECT prev_yr.d_year AS prev_year, curr_yr.d_year AS year, curr_yr.d_month_seq FROM t1 curr_yr cross join t1 prev_yr WHERE curr_yr.d_year=2002 AND prev_yr.d_year=2002-1 ORDER BY d_month_seq LIMIT 100 """) df.explain() df.show(){code} the repro query : A. defines a temp table t1 B. cross join t1 (year 2002) and t2 (year 2001) With reuse exchange enabled, the plan incorrectly "decides" to re-use persisted shuffle writes of A filtering on year 2002 , for year 2001. {code:java} == Physical Plan == TakeOrderedAndProject(limit=100, orderBy=[d_month_seq#24371L ASC NULLS FIRST], output=[prev_year#24366L,year#24367L,d_month_seq#24371L]) +- *(9) Project [d_year#24402L AS prev_year#24366L, d_year#23551L AS year#24367L, d_month_seq#24371L] +- CartesianProduct :- *(4) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[]) : +- Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200) : +- *(3) HashAggregate(keys=[d_year#23551L, d_month_seq#24371L], functions=[]) : +- BroadcastNestedLoopJoin BuildRight, Cross : :- *(1) Project [d_year#23551L] : : +- *(1) ScanV2 BigQueryDataSourceV2[d_year#23551L] (Filters: [isnotnull(d_day_name#23559), (d_day_name#23559 = Monday), isnotnull(d_fy_year#23556L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]) : +- BroadcastExchange IdentityBroadcastMode : +- *(2) Project [d_month_seq#24371L] : +- *(2) ScanV2 BigQueryDataSourceV2[d_month_seq#24371L] (Filters: [isnotnull(d_day_name#24382), (d_day_name#24382 = Monday), isnotnull(d_fy_year#24379L), (d_fy_yea..., Options: [table=tpcds_1G.date_dim,paths=[]]) +- *(8) HashAggregate(keys=[d_year#24402L, d_month_seq#24455L], functions=[]) +- ReusedExchange [d_year#24402L, d_month_seq#24455L], Exchange hashpartitioning(d_year#23551L, d_month_seq#24371L, 200){code} And the result is obviously incorrect because prev_year should be 2001 {code:java} +---------+----+-----------+ |prev_year|year|d_month_seq| +---------+----+-----------+ | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| | 2002|2002| 1212| +---------+----+-----------+ only showing top 20 rows {code} -- This message was sent by Atlassian Jira (v8.3.4#803005) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org