[
https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alessandro Bellina updated SPARK-39131:
---------------------------------------
Component/s: SQL
> "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
> ------------------------------------------------------------------------------
>
> Key: SPARK-39131
> URL: https://issues.apache.org/jira/browse/SPARK-39131
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 3.2.1
> Reporter: Alessandro Bellina
> Priority: Major
>
> We would like to propose a slight change in the order of execution of logical
> plan optimizer rules given a performance issue we have seen with {{LeftSemi}}
> being materialized too late in the logical plan optimizer, and not benefiting
> from the null filtering that {{InferFiltersFromConstraints}} can insert.
> I have "something that works" locally (see rest of the description for info
> and a diff), but given that this is the optimizer it is not clear what else I
> could be breaking, so I'd like to hear from the experts on whether this is
> the right change.
> The query in question is based on TPCDS query16 which originally has an
> {{exists}} filter:
> {code:sql}
> …
> and exists (select *
> from catalog_sales cs2
> where cs1.cs_order_number = cs2.cs_order_number
> and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
> …
> {code}
> The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join
> like so:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L],
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
> :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(cs_order_number#17L, 200),
> ENSURE_REQUIREMENTS, [id=#364]
> : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
> : +- *(1) ColumnarToRow
> : +- FileScan parquet [...] Batched: true, DataFilters:
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10),
> isnotnull(cs_call_center_sk#11)],..., PushedFilters:
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk),
> IsNotNull(cs_call_center_sk)], ReadSchema: ...
> +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(cs_order_number#872L, 200),
> ENSURE_REQUIREMENTS, [id=#372]
> +- *(3) ColumnarToRow
> +- FileScan parquet [...] Batched: true, DataFilters: [], ...,
> PushedFilters: [], ReadSchema: ...
> {code}
> Note that the {{LeftSemi}} key and condition are not being filtered out from
> the stream side, and the build side has not filter at all. We have found that
> as the dataset size increases, this can become an issue, and in our case, it
> was many nulls that will not match. We would like to remove the unnecessary
> rows early at the scan and filter phases.
> The change we made allows the join key and the condition to be added to the
> stream side filter, and for the build side filter to get added:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L],
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
> :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(cs_order_number#17L, 200),
> ENSURE_REQUIREMENTS, [id=#759]
> : +-*(1) Filter ((((isnotnull(cs_ship_date_sk#2) AND
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND
> isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
> : +- *(1) ColumnarToRow
> : +- FileScan parquet ..., DataFilters:
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10),
> isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters:
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk),
> IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
> +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(cs_order_number#943L, 200),
> ENSURE_REQUIREMENTS, [id=#768]
> +- *(3) Filter (isnotnull(cs_order_number#943L) AND
> isnotnull(cs_warehouse_sk#940))
> +- *(3) ColumnarToRow
> +- FileScan parquet ..., DataFilters:
> [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ...,
> PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number),
> IsNotNull(cs_warehouse_sk)], ReadSchema: ...
> {code}
> This issue can be boiled down to this simple repro:
> {code:java}
> sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else
> {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
> spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
> spark.sql("select * from my_table t1 where exists(select * from my_table t2
> where t2.value = t1.value)").explain(true)
> {code}
> Which produces a similar plan, with a {{LeftSemi}} and no filters:
> {code:sql}
> == Physical Plan ==
> *(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
> :- *(2) ColumnarToRow
> : +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format:
> Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table],
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
> true] as bigint)),false), [id=#125]
> +- *(1) ColumnarToRow
> +- FileScan parquet [value#22] Batched: true, DataFilters: [], Format:
> Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table],
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
> {code}
> If we naively add an extra optimizer batch for
> {{InferFiltersFromConstraints}}:
> https://github.com/abellina/spark/commit/8aaeb89151e04101c9513d7d7abd21cd00348acb,
> we get the desired physical plan:
> {code:sql}
> == Physical Plan ==
> *(2) BroadcastHashJoin [value#7], [value#13], LeftSemi, BuildRight, false
> :- *(2) Filter isnotnull(value#7)
> : +- *(2) ColumnarToRow
> : +- FileScan parquet [value#7] Batched: true, DataFilters:
> [isnotnull(value#7)], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters:
> [IsNotNull(value)], ReadSchema: struct<value:int>
> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
> false] as bigint)),false), [id=#146]
> +- *(1) Filter isnotnull(value#13)
> +- *(1) ColumnarToRow
> +- FileScan parquet [value#13] Batched: true, DataFilters:
> [isnotnull(value#13)], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters:
> [IsNotNull(value)], ReadSchema: struct<value:
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]