[
https://issues.apache.org/jira/browse/SPARK-32041?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17272102#comment-17272102
]
Peter Toth commented on SPARK-32041:
------------------------------------
Let me reopen this ticket as this is not a duplicate of SPARK-29375 but more
like a bug. The connection between this ticket, SPARK-29375 and SPARK-28940 is
that my PR (https://github.com/apache/spark/pull/28885) fixes all of them.
> Exchange reuse won't work in cases when DPP, subqueries are involved
> --------------------------------------------------------------------
>
> Key: SPARK-32041
> URL: https://issues.apache.org/jira/browse/SPARK-32041
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 2.4.6, 3.0.0
> Reporter: Prakhar Jain
> Priority: Major
>
> When an Exchange node is repeated at multiple places in the PhysicalPlan, and
> if that exchange has some some DPP Subquery filter, then ReuseExchange
> doesn't work for such Exchange and different stages are launched to compute
> same thing.
> Example:
> {noformat}
> // generate data
> val factData = (1 to 100).map(i => (i%5, i%20, i))
> factData.toDF("store_id", "product_id", "units_sold")
> .write
> .partitionBy("store_id")
> .format("parquet")
> .saveAsTable("fact_stats")
> val dimData = Seq[(Int, String, String)](
> (1, "AU", "US"),
> (2, "CA", "US"),
> (3, "KA", "IN"),
> (4, "DL", "IN"),
> (5, "GA", "PA"))
> dimData.toDF("store_id", "state_province", "country")
> .write
> .format("parquet")
> .saveAsTable("dim_stats")
> sql("ANALYZE TABLE fact_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> sql("ANALYZE TABLE dim_stats COMPUTE STATISTICS FOR COLUMNS store_id")
> // Set Configs
> spark.sql("set
> spark.sql.optimizer.dynamicPartitionPruning.reuseBroadcastOnly=true")
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=1000")
> val query = """
> With view1 as (
> SELECT product_id, f.store_id
> FROM fact_stats f JOIN dim_stats
> ON f.store_id = dim_stats.store_id WHERE dim_stats.country = 'IN')
> SELECT * FROM view1 v1 join view1 v2 WHERE v1.product_id = v2.product_id
> """
> val df = spark.sql(query)
> println(df.queryExecution.executedPlan)
> {noformat}
> {noformat}
> Plan:
> *(7) SortMergeJoin [product_id#1968|#1968], [product_id#2060|#2060], Inner
> :- *(3) Sort [product_id#1968 ASC NULLS FIRST|#1968 ASC NULLS FIRST], false, > 0
> : +- Exchange hashpartitioning(product_id#1968, 5), true, [id=#1140|#1140]
> : +- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
> : +- *(2) BroadcastHashJoin [store_id#1970|#1970], [store_id#1971|#1971],
> Inner, BuildRight
> : :- *(2) Project [product_id#1968, store_id#1970|#1968, store_id#1970]
> : : +- *(2) Filter isnotnull(product_id#1968)
> : : +- *(2) ColumnarToRow
> : : +- FileScan parquet
> default.fact_stats[product_id#1968,store_id#1970|#1968,store_id#1970]
> Batched: true, DataFilters: [isnotnull(product_id#1968)|#1968)], Format:
> Parquet, Location:
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
> PartitionFilters: [isnotnull(store_id#1970),
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)|#1970),
> dynamicpruningexpression(store_id#1970 IN dynamicpruning#2067)],
> PushedFilters: [IsNotNull(product_id)], ReadSchema: struct<product_id:int>
> : : +- SubqueryBroadcast dynamicpruning#2067, 0, [store_id#1971|#1971],
> [id=#1131|#1131]
> : : +- ReusedExchange [store_id#1971|#1971], BroadcastExchange
> HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint))),
> [id=#1021|#1021]
> : +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
> true] as bigint))), [id=#1021|#1021]
> : +- *(1) Project [store_id#1971|#1971]
> : +- *(1) Filter ((isnotnull(country#1973) AND (country#1973 = IN)) AND
> isnotnull(store_id#1971))
> : +- *(1) ColumnarToRow
> : +- FileScan parquet
> default.dim_stats[store_id#1971,country#1973|#1971,country#1973] Batched:
> true, DataFilters: [isnotnull(country#1973), (country#1973 = IN),
> isnotnull(store_id#1971)|#1973), (country#1973 = IN),
> isnotnull(store_id#1971)], Format: Parquet, Location:
> InMemoryFileIndex[file:/home/prakhar/src/os/1_spark/sql/core/spark-warehouse/org.apache.spark.sql...,
> PartitionFilters: [], PushedFilters: [IsNotNull(country),
> EqualTo(country,IN), IsNotNull(store_id)], ReadSchema:
> struct<store_id:int,country:string>
> +- *(6) Sort [product_id#2060 ASC NULLS FIRST|#2060 ASC NULLS FIRST], false, > 0
> +- ReusedExchange [product_id#2060, store_id#2062|#2060, store_id#2062],
> Exchange hashpartitioning(product_id#1968, 5), true, [id=#1026|#1026]
> {noformat}
> Issue:
> Note the last line of plan. Its a ReusedExchange which is pointing to
> id=1026. But There is no Exchange node in plan with ID 1026. ReusedExchange
> node is pointing to incorrect Child node (1026 instead of 1140) and so in
> actual, exchange reuse won't happen in this query.
> Another query where issue is because of ReuseSubquery:
> {noformat}
> spark.sql("set spark.sql.autoBroadcastJoinThreshold=-1")
> val query1 = """
> | With view1 as (
> | SELECT product_id, units_sold
> | FROM fact_stats
> | WHERE store_id = (SELECT max(store_id) FROM dim_stats)
> | and units_sold = 2
> | ), view2 as (
> | SELECT product_id, units_sold
> | FROM fact_stats
> | WHERE store_id = (SELECT max(store_id) FROM dim_stats)
> | and units_sold = 1
> | )
> |
> | SELECT *
> | FROM view1 v1 join view2 v2 join view2 v3
> | WHERE v1.product_id = v2.product_id and v2.product_id =
> v3.product_id
> """
> // Here we are joining v2 with self. So it should use ReuseExchange. But
> final plan computes v2 twice.
> val df = spark.sql(query1);
> println(df.queryExecution.executedPlan){noformat}
> Here we are joining v2 with self. So it should use ReuseExchange. But final
> plan computes v2 twice.
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]