[
https://issues.apache.org/jira/browse/SPARK-39131?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Alessandro Bellina updated SPARK-39131:
---------------------------------------
Description:
We would like to propose a slight change in the order of execution of logical
plan optimizer rules given a performance issue we have seen with {{LeftSemi}}
being materialized too late in the logical plan optimizer, and not benefiting
from the null filtering that {{InferFiltersFromConstraints}} can insert.
I have "something that works" locally (see rest of the description for info and
a diff), but given that this is the optimizer it is not clear what else I could
be breaking, so I'd like to hear from the experts on whether this is the right
change.
The query in question is based on TPCDS query16 which originally has an
{{exists}} filter:
{code:sql}
…
and exists (select *
from catalog_sales cs2
where cs1.cs_order_number = cs2.cs_order_number
and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
…
{code}
The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join
like so:
{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi,
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
:- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cs_order_number#17L, 200),
ENSURE_REQUIREMENTS, [id=#364]
: +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
: +- *(1) ColumnarToRow
: +- FileScan parquet [...] Batched: true, DataFilters:
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10),
isnotnull(cs_call_center_sk#11)],..., PushedFilters:
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk),
IsNotNull(cs_call_center_sk)], ReadSchema: ...
+- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cs_order_number#872L, 200),
ENSURE_REQUIREMENTS, [id=#372]
+- *(3) ColumnarToRow
+- FileScan parquet [...] Batched: true, DataFilters: [], ...,
PushedFilters: [], ReadSchema: ...
{code}
Note that the {{LeftSemi}} key and condition are not being null filtered on the
stream side, and the build side has no filter at all. We have found that as the
dataset size increases, this can become an issue, and in our case, it was many
nulls that will not match. We would like to remove the unnecessary rows early
at the scan and filter phases.
The change we made allows the join key and the condition to be added to the
stream side filter, and for the build side filter to get added:
{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi,
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
:- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cs_order_number#17L, 200),
ENSURE_REQUIREMENTS, [id=#759]
: +-*(1) Filter ((((isnotnull(cs_ship_date_sk#2) AND
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND
isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
: +- *(1) ColumnarToRow
: +- FileScan parquet ..., DataFilters:
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10),
isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters:
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk),
IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
+- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cs_order_number#943L, 200),
ENSURE_REQUIREMENTS, [id=#768]
+- *(3) Filter (isnotnull(cs_order_number#943L) AND
isnotnull(cs_warehouse_sk#940))
+- *(3) ColumnarToRow
+- FileScan parquet ..., DataFilters:
[isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ...,
PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number),
IsNotNull(cs_warehouse_sk)], ReadSchema: ...
{code}
This issue can be boiled down to this simple repro:
{code:java}
sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else
{Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
spark.sql("select * from my_table t1 where exists(select * from my_table t2
where t2.value = t1.value)").explain(true)
{code}
Which produces a similar plan, with a {{LeftSemi}} and no filters:
{code:sql}
== Physical Plan ==
*(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
: +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true]
as bigint)),false), [id=#125]
+- *(1) ColumnarToRow
+- FileScan parquet [value#22] Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
{code}
If we naively add an extra optimizer batch for {{InferFiltersFromConstraints}}:
https://github.com/abellina/spark/commit/8aaeb89151e04101c9513d7d7abd21cd00348acb,
we get the desired physical plan:
{code:sql}
== Physical Plan ==
*(2) BroadcastHashJoin [value#7], [value#13], LeftSemi, BuildRight, false
:- *(2) Filter isnotnull(value#7)
: +- *(2) ColumnarToRow
: +- FileScan parquet [value#7] Batched: true, DataFilters:
[isnotnull(value#7)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters:
[IsNotNull(value)], ReadSchema: struct<value:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false]
as bigint)),false), [id=#146]
+- *(1) Filter isnotnull(value#13)
+- *(1) ColumnarToRow
+- FileScan parquet [value#13] Batched: true, DataFilters:
[isnotnull(value#13)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters:
[IsNotNull(value)], ReadSchema: struct<value:
{code}
was:
We would like to propose a slight change in the order of execution of logical
plan optimizer rules given a performance issue we have seen with {{LeftSemi}}
being materialized too late in the logical plan optimizer, and not benefiting
from the null filtering that {{InferFiltersFromConstraints}} can insert.
I have "something that works" locally (see rest of the description for info and
a diff), but given that this is the optimizer it is not clear what else I could
be breaking, so I'd like to hear from the experts on whether this is the right
change.
The query in question is based on TPCDS query16 which originally has an
{{exists}} filter:
{code:sql}
…
and exists (select *
from catalog_sales cs2
where cs1.cs_order_number = cs2.cs_order_number
and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
…
{code}
The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join
like so:
{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L], LeftSemi,
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
:- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cs_order_number#17L, 200),
ENSURE_REQUIREMENTS, [id=#364]
: +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
: +- *(1) ColumnarToRow
: +- FileScan parquet [...] Batched: true, DataFilters:
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10),
isnotnull(cs_call_center_sk#11)],..., PushedFilters:
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk),
IsNotNull(cs_call_center_sk)], ReadSchema: ...
+- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cs_order_number#872L, 200),
ENSURE_REQUIREMENTS, [id=#372]
+- *(3) ColumnarToRow
+- FileScan parquet [...] Batched: true, DataFilters: [], ...,
PushedFilters: [], ReadSchema: ...
{code}
Note that the {{LeftSemi}} key and condition are not being null filtered on the
stream side, and the build side has not filter at all. We have found that as
the dataset size increases, this can become an issue, and in our case, it was
many nulls that will not match. We would like to remove the unnecessary rows
early at the scan and filter phases.
The change we made allows the join key and the condition to be added to the
stream side filter, and for the build side filter to get added:
{code:sql}
+- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L], LeftSemi,
NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
:- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(cs_order_number#17L, 200),
ENSURE_REQUIREMENTS, [id=#759]
: +-*(1) Filter ((((isnotnull(cs_ship_date_sk#2) AND
isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND
isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
: +- *(1) ColumnarToRow
: +- FileScan parquet ..., DataFilters:
[isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10),
isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters:
[IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk),
IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
+- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(cs_order_number#943L, 200),
ENSURE_REQUIREMENTS, [id=#768]
+- *(3) Filter (isnotnull(cs_order_number#943L) AND
isnotnull(cs_warehouse_sk#940))
+- *(3) ColumnarToRow
+- FileScan parquet ..., DataFilters:
[isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ...,
PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number),
IsNotNull(cs_warehouse_sk)], ReadSchema: ...
{code}
This issue can be boiled down to this simple repro:
{code:java}
sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else
{Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
spark.sql("select * from my_table t1 where exists(select * from my_table t2
where t2.value = t1.value)").explain(true)
{code}
Which produces a similar plan, with a {{LeftSemi}} and no filters:
{code:sql}
== Physical Plan ==
*(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
:- *(2) ColumnarToRow
: +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true]
as bigint)),false), [id=#125]
+- *(1) ColumnarToRow
+- FileScan parquet [value#22] Batched: true, DataFilters: [], Format:
Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table],
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
{code}
If we naively add an extra optimizer batch for {{InferFiltersFromConstraints}}:
https://github.com/abellina/spark/commit/8aaeb89151e04101c9513d7d7abd21cd00348acb,
we get the desired physical plan:
{code:sql}
== Physical Plan ==
*(2) BroadcastHashJoin [value#7], [value#13], LeftSemi, BuildRight, false
:- *(2) Filter isnotnull(value#7)
: +- *(2) ColumnarToRow
: +- FileScan parquet [value#7] Batched: true, DataFilters:
[isnotnull(value#7)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters:
[IsNotNull(value)], ReadSchema: struct<value:int>
+- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false]
as bigint)),false), [id=#146]
+- *(1) Filter isnotnull(value#13)
+- *(1) ColumnarToRow
+- FileScan parquet [value#13] Batched: true, DataFilters:
[isnotnull(value#13)], Format: Parquet, Location: InMemoryFileIndex(1
paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters:
[IsNotNull(value)], ReadSchema: struct<value:
{code}
> "Exists" is optimized too late (to LeftSemi) preventing filters to be inferred
> ------------------------------------------------------------------------------
>
> Key: SPARK-39131
> URL: https://issues.apache.org/jira/browse/SPARK-39131
> Project: Spark
> Issue Type: Bug
> Components: Optimizer, SQL
> Affects Versions: 3.2.1
> Reporter: Alessandro Bellina
> Priority: Major
>
> We would like to propose a slight change in the order of execution of logical
> plan optimizer rules given a performance issue we have seen with {{LeftSemi}}
> being materialized too late in the logical plan optimizer, and not benefiting
> from the null filtering that {{InferFiltersFromConstraints}} can insert.
> I have "something that works" locally (see rest of the description for info
> and a diff), but given that this is the optimizer it is not clear what else I
> could be breaking, so I'd like to hear from the experts on whether this is
> the right change.
> The query in question is based on TPCDS query16 which originally has an
> {{exists}} filter:
> {code:sql}
> …
> and exists (select *
> from catalog_sales cs2
> where cs1.cs_order_number = cs2.cs_order_number
> and cs1.cs_warehouse_sk <> cs2.cs_warehouse_sk)
> …
> {code}
> The rule {{RewritePredicateSubquery}} will turn this into a {{LeftSemi}} join
> like so:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#872L],
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#869)
> :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(cs_order_number#17L, 200),
> ENSURE_REQUIREMENTS, [id=#364]
> : +- *(1) Filter ((isnotnull(cs_ship_date_sk#2) AND
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11))
> : +- *(1) ColumnarToRow
> : +- FileScan parquet [...] Batched: true, DataFilters:
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10),
> isnotnull(cs_call_center_sk#11)],..., PushedFilters:
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk),
> IsNotNull(cs_call_center_sk)], ReadSchema: ...
> +- *(4) Sort [cs_order_number#872L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(cs_order_number#872L, 200),
> ENSURE_REQUIREMENTS, [id=#372]
> +- *(3) ColumnarToRow
> +- FileScan parquet [...] Batched: true, DataFilters: [], ...,
> PushedFilters: [], ReadSchema: ...
> {code}
> Note that the {{LeftSemi}} key and condition are not being null filtered on
> the stream side, and the build side has no filter at all. We have found that
> as the dataset size increases, this can become an issue, and in our case, it
> was many nulls that will not match. We would like to remove the unnecessary
> rows early at the scan and filter phases.
> The change we made allows the join key and the condition to be added to the
> stream side filter, and for the build side filter to get added:
> {code:sql}
> +- *(9) SortMergeJoin [cs_order_number#17L], [cs_order_number#943L],
> LeftSemi, NOT (cs_warehouse_sk#14 = cs_warehouse_sk#940)
> :- *(2) Sort [cs_order_number#17L ASC NULLS FIRST], false, 0
> : +- Exchange hashpartitioning(cs_order_number#17L, 200),
> ENSURE_REQUIREMENTS, [id=#759]
> : +-*(1) Filter ((((isnotnull(cs_ship_date_sk#2) AND
> isnotnull(cs_ship_addr_sk#10)) AND isnotnull(cs_call_center_sk#11)) AND
> isnotnull(cs_order_number#17L)) AND isnotnull(cs_warehouse_sk#14))
> : +- *(1) ColumnarToRow
> : +- FileScan parquet ..., DataFilters:
> [isnotnull(cs_ship_date_sk#2), isnotnull(cs_ship_addr_sk#10),
> isnotnull(cs_call_center_sk#11), is..., ..., PushedFilters:
> [IsNotNull(cs_ship_date_sk), IsNotNull(cs_ship_addr_sk),
> IsNotNull(cs_call_center_sk), IsNotNull(..., ReadSchema: ...
> +- *(4) Sort [cs_order_number#943L ASC NULLS FIRST], false, 0
> +- Exchange hashpartitioning(cs_order_number#943L, 200),
> ENSURE_REQUIREMENTS, [id=#768]
> +- *(3) Filter (isnotnull(cs_order_number#943L) AND
> isnotnull(cs_warehouse_sk#940))
> +- *(3) ColumnarToRow
> +- FileScan parquet ..., DataFilters:
> [isnotnull(cs_order_number#943L), isnotnull(cs_warehouse_sk#940)], ...,
> PartitionFilters: [], PushedFilters: [IsNotNull(cs_order_number),
> IsNotNull(cs_warehouse_sk)], ReadSchema: ...
> {code}
> This issue can be boiled down to this simple repro:
> {code:java}
> sc.parallelize((0 until 10).map(i => if (i%2 == 0) {null} else
> {Int.box(i)})).toDF.write.parquet("file:///tmp/my_test_table")
> spark.read.parquet("file:///tmp/my_test_table").createOrReplaceTempView("my_table")
> spark.sql("select * from my_table t1 where exists(select * from my_table t2
> where t2.value = t1.value)").explain(true)
> {code}
> Which produces a similar plan, with a {{LeftSemi}} and no filters:
> {code:sql}
> == Physical Plan ==
> *(2) BroadcastHashJoin [value#19], [value#22], LeftSemi, BuildRight, false
> :- *(2) ColumnarToRow
> : +- FileScan parquet [value#19] Batched: true, DataFilters: [], Format:
> Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table],
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
> true] as bigint)),false), [id=#125]
> +- *(1) ColumnarToRow
> +- FileScan parquet [value#22] Batched: true, DataFilters: [], Format:
> Parquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/my_test_table],
> PartitionFilters: [], PushedFilters: [], ReadSchema: struct<value:int>
> {code}
> If we naively add an extra optimizer batch for
> {{InferFiltersFromConstraints}}:
> https://github.com/abellina/spark/commit/8aaeb89151e04101c9513d7d7abd21cd00348acb,
> we get the desired physical plan:
> {code:sql}
> == Physical Plan ==
> *(2) BroadcastHashJoin [value#7], [value#13], LeftSemi, BuildRight, false
> :- *(2) Filter isnotnull(value#7)
> : +- *(2) ColumnarToRow
> : +- FileScan parquet [value#7] Batched: true, DataFilters:
> [isnotnull(value#7)], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters:
> [IsNotNull(value)], ReadSchema: struct<value:int>
> +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int,
> false] as bigint)),false), [id=#146]
> +- *(1) Filter isnotnull(value#13)
> +- *(1) ColumnarToRow
> +- FileScan parquet [value#13] Batched: true, DataFilters:
> [isnotnull(value#13)], Format: Parquet, Location: InMemoryFileIndex(1
> paths)[file:/tmp/my_test_table], PartitionFilters: [], PushedFilters:
> [IsNotNull(value)], ReadSchema: struct<value:
> {code}
--
This message was sent by Atlassian Jira
(v8.20.7#820007)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]