[ 
https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alessandro Bellina updated SPARK-39131:
---------------------------------------
    Component/s: SQL

> "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
> ------------------------------------------------------------------------------
>
>                 Key: SPARK-39131
>                 URL: https://issues.apache.org/jira/browse/SPARK-39131
>             Project: Spark
>          Issue Type: Bug
>          Components: Optimizer, SQL
>    Affects Versions: 3.2.1
>            Reporter: Alessandro Bellina
>            Priority: Major
>
> We would like to propose a slight change in the order of execution of logical 
> plan optimizer rules given a performance issue we have seen with {{LeftSemi}} 
> being materialized too late in the logical plan optimizer, and not benefiting 
> from the null filtering that {{InferFiltersFromConstraints}} can insert.
> I have "something that works" locally (see rest of the description for info 
> and a diff), but given that this is the optimizer it is not clear what else I 
> could be breaking, so I'd like to hear from the experts on whether this is 
> the right change.
> The query in question is based on TPCDS query16 which originally has an 
> {{exists}} filter: 
> {code:sql}
> …
> and exists (select *
>             from catalog_sales cs2
>             where cs1.cs_order_number = cs2.cs_order_number 
>               and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
> …
> {code}
> The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join 
> like so:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], 
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
>      :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
>      :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
> ENSURE_REQUIREMENTS, [id=#364]
>      :     +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND 
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
>      :        +- *(1) ColumnarToRow
>      :           +- FileScan parquet [...] Batched: true, DataFilters: 
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
> isnotnull(cs_call_center_sk#11)],..., PushedFilters: 
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
> IsNotNull(cs_call_center_sk)], ReadSchema: ...
>      +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
>         +- Exchange hashpartitioning(cs_order_number#872L, 200), 
> ENSURE_REQUIREMENTS, [id=#372]
>            +- *(3) ColumnarToRow
>               +- FileScan parquet [...] Batched: true, DataFilters: [], ..., 
> PushedFilters: [], ReadSchema: ...
> {code}
> Note that the {{LeftSemi}} key and condition are not being filtered out from 
> the stream side, and the build side has not filter at all. We have found that 
> as the dataset size increases, this can become an issue, and in our case, it 
> was many nulls that will not match. We would like to remove the unnecessary 
> rows early at the scan and filter phases.
> The change we made allows the join key and the condition to be added to the 
> stream side filter, and for the build side filter to get added:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], 
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
>    :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
>    :  +- Exchange hashpartitioning(cs_order_number#17L, 200), 
> ENSURE_REQUIREMENTS, [id=#759]
>    :     +-*(1) Filter ((((isnotnull(cs_ship_date_sk#2) AND 
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND 
> isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
>    :        +- *(1) ColumnarToRow
>    :           +- FileScan parquet ..., DataFilters: 
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10), 
> isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters: 
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk), 
> IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
>    +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
>       +- Exchange hashpartitioning(cs_order_number#943L, 200), 
> ENSURE_REQUIREMENTS, [id=#768]
>          +- *(3) Filter (isnotnull(cs_order_number#943L) AND 
> isnotnull(cs_warehouse_sk#940))
>             +- *(3) ColumnarToRow
>                +- FileScan parquet ..., DataFilters: 
> [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ..., 
> PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number), 
> IsNotNull(cs_warehouse_sk)], ReadSchema: ... 
> {code}
> This issue can be boiled down to this simple repro:
> {code:java}
> sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else 
> {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
> spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
> spark.sql("select * from my_table t1 where exists(select * from my_table  t2 
> where t2.value = t1.value)").explain(true)
> {code}
> Which produces a similar plan, with a {{LeftSemi}} and no filters:
> {code:sql}
> == Physical Plan ==
> *(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
> :- *(2) ColumnarToRow
> :  +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format: 
> Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> true] as bigint)),false), [id=#125]
>    +- *(1) ColumnarToRow
>       +- FileScan parquet [value#22] Batched: true, DataFilters: [], Format: 
> Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table], 
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
> {code}
> If we naively add an extra optimizer batch for 
> {{InferFiltersFromConstraints}}: 
> https://github.com/abellina/spark/commit/8aaeb89151e04101c9513d7d7abd21cd00348acb,
>  we get the desired physical plan:
> {code:sql}
> == Physical Plan ==
> *(2) BroadcastHashJoin [value#7], [value#13], LeftSemi, BuildRight, false
> :- *(2) Filter isnotnull(value#7)
> :  +- *(2) ColumnarToRow
> :     +- FileScan parquet [value#7] Batched: true, DataFilters: 
> [isnotnull(value#7)], Format: Parquet, Location: InMemoryFileIndex(1 
> paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters: 
> [IsNotNull(value)], ReadSchema: struct<value:int>
> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, 
> false] as bigint)),false), [id=#146]
>    +- *(1) Filter isnotnull(value#13)
>       +- *(1) ColumnarToRow
>          +- FileScan parquet [value#13] Batched: true, DataFilters: 
> [isnotnull(value#13)], Format: Parquet, Location: InMemoryFileIndex(1 
> paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters: 
> [IsNotNull(value)], ReadSchema: struct<value:
> {code}



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to