[
https://issues.apache.org/jira/browse/SPARK-34081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Yuming Wang updated SPARK-34081:
--------------------------------
Description:
Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases.
{code:scala}
spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as
b").write.saveAsTable("t1") spark.range(40000000L).selectExpr("id % 8000 as c",
"id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b FROM
t1 INTERSECT SELECT distinct c, d FROM t2").explain{code}
Current:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS,
[id=#72]
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L),
coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L),
coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
:- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L)
ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS
FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(a#16L, 0),
isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS,
[id=#65]
: +- FileScan parquet default.t1[a#16L,b#17L] Batched:
true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
+- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L)
ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS
FIRST], false, 0
+- Exchange hashpartitioning(coalesce(c#18L, 0),
isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS,
[id=#66]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- Exchange hashpartitioning(c#18L, d#19L, 5),
ENSURE_REQUIREMENTS, [id=#61]
+- HashAggregate(keys=[c#18L, d#19L],
functions=[])
+- FileScan parquet default.t2[c#18L,d#19L]
Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
{noformat}
Expected:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS, [id=#74]
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L,
0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0),
isnull(d#19L)], LeftSemi
:- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC
NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS
FIRST], false, 0
: +- Exchange hashpartitioning(coalesce(a#16L, 0), isnull(a#16L),
coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS, [id=#67]
: +- HashAggregate(keys=[a#16L, b#17L], functions=[])
: +- Exchange hashpartitioning(a#16L, b#17L, 5),
ENSURE_REQUIREMENTS, [id=#61]
: +- HashAggregate(keys=[a#16L, b#17L], functions=[])
: +- FileScan parquet default.t1[a#16L,b#17L] Batched:
true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
+- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC
NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS
FIRST], false, 0
+- Exchange hashpartitioning(coalesce(c#18L, 0), isnull(c#18L),
coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS, [id=#68]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- Exchange hashpartitioning(c#18L, d#19L, 5),
ENSURE_REQUIREMENTS, [id=#63]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- FileScan parquet default.t2[c#18L,d#19L] Batched:
true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
{noformat}
was:
{code:scala}
spark.range(5).selectExpr("id as a", "id as b").write.saveAsTable("t1")
spark.range(3).selectExpr("id as c", "id as d").write.saveAsTable("t2")
spark.sql("SELECT distinct a, b FROM t1 INTERSECT SELECT distinct c, d FROM
t2").explain
{code}
Current:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS,
[id=#68]
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- BroadcastHashJoin [coalesce(a#16L, 0), isnull(a#16L),
coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L),
coalesce(d#19L, 0), isnull(d#19L)], LeftSemi, BuildRight, false
:- FileScan parquet default.t1[a#16L,b#17L] Batched: true,
DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
+- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0),
isnull(input[0, bigint, true]), coalesce(input[1, bigint, true], 0),
isnull(input[1, bigint, true])),false), [id=#64]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- Exchange hashpartitioning(c#18L, d#19L, 5),
ENSURE_REQUIREMENTS, [id=#61]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- FileScan parquet default.t2[c#18L,d#19L]
Batched: true, DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
{noformat}
Expected:
{noformat}
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- HashAggregate(keys=[a#16L, b#17L], functions=[])
+- BroadcastHashJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L,
0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0),
isnull(d#19L)], LeftSemi, BuildRight, false
:- HashAggregate(keys=[a#16L, b#17L], functions=[])
: +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS,
[id=#61]
: +- HashAggregate(keys=[a#16L, b#17L], functions=[])
: +- FileScan parquet default.t1[a#16L,b#17L] Batched: true,
DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<a:bigint,b:bigint>
+- BroadcastExchange
HashedRelationBroadcastMode(List(coalesce(input[0, bigint, true], 0),
isnull(input[0, bigint, true]), coalesce(input[1, bigint, true], 0),
isnull(input[1, bigint, true])),false), [id=#66]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- Exchange hashpartitioning(c#18L, d#19L, 5),
ENSURE_REQUIREMENTS, [id=#63]
+- HashAggregate(keys=[c#18L, d#19L], functions=[])
+- FileScan parquet default.t2[c#18L,d#19L] Batched: true,
DataFilters: [], Format: Parquet, Location:
InMemoryFileIndex[file:/Users/yumwang/spark/SPARK-32289/spark-warehouse/org.apache.spark.sql.Data...,
PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c:bigint,d:bigint>
{noformat}
> Only pushdown LeftSemi/LeftAnti over Aggregate if join can be planned as
> broadcast join
> ---------------------------------------------------------------------------------------
>
> Key: SPARK-34081
> URL: https://issues.apache.org/jira/browse/SPARK-34081
> Project: Spark
> Issue Type: Bug
> Components: SQL
> Affects Versions: 3.2.0
> Reporter: Yuming Wang
> Priority: Major
>
> Should not pushdown LeftSemi/LeftAnti over Aggregate for some cases.
> {code:scala}
> spark.range(50000000L).selectExpr("id % 10000 as a", "id % 10000 as
> b").write.saveAsTable("t1") spark.range(40000000L).selectExpr("id % 8000 as
> c", "id % 8000 as d").write.saveAsTable("t2") spark.sql("SELECT distinct a, b
> FROM t1 INTERSECT SELECT distinct c, d FROM t2").explain{code}
> Current:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[a#16L, b#17L], functions=[])
> +- HashAggregate(keys=[a#16L, b#17L], functions=[])
> +- HashAggregate(keys=[a#16L, b#17L], functions=[])
> +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS,
> [id=#72]
> +- HashAggregate(keys=[a#16L, b#17L], functions=[])
> +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L),
> coalesce(b#17L, 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L),
> coalesce(d#19L, 0), isnull(d#19L)], LeftSemi
> :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L)
> ASC NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS
> FIRST], false, 0
> : +- Exchange hashpartitioning(coalesce(a#16L, 0),
> isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS,
> [id=#65]
> : +- FileScan parquet default.t1[a#16L,b#17L] Batched:
> true, DataFilters: [], Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data...,
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<a:bigint,b:bigint>
> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L)
> ASC NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS
> FIRST], false, 0
> +- Exchange hashpartitioning(coalesce(c#18L, 0),
> isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS,
> [id=#66]
> +- HashAggregate(keys=[c#18L, d#19L], functions=[])
> +- Exchange hashpartitioning(c#18L, d#19L, 5),
> ENSURE_REQUIREMENTS, [id=#61]
> +- HashAggregate(keys=[c#18L, d#19L],
> functions=[])
> +- FileScan parquet default.t2[c#18L,d#19L]
> Batched: true, DataFilters: [], Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data...,
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<c:bigint,d:bigint>
> {noformat}
>
> Expected:
> {noformat}
> == Physical Plan ==
> AdaptiveSparkPlan isFinalPlan=false
> +- HashAggregate(keys=[a#16L, b#17L], functions=[])
> +- Exchange hashpartitioning(a#16L, b#17L, 5), ENSURE_REQUIREMENTS,
> [id=#74]
> +- HashAggregate(keys=[a#16L, b#17L], functions=[])
> +- SortMergeJoin [coalesce(a#16L, 0), isnull(a#16L), coalesce(b#17L,
> 0), isnull(b#17L)], [coalesce(c#18L, 0), isnull(c#18L), coalesce(d#19L, 0),
> isnull(d#19L)], LeftSemi
> :- Sort [coalesce(a#16L, 0) ASC NULLS FIRST, isnull(a#16L) ASC
> NULLS FIRST, coalesce(b#17L, 0) ASC NULLS FIRST, isnull(b#17L) ASC NULLS
> FIRST], false, 0
> : +- Exchange hashpartitioning(coalesce(a#16L, 0),
> isnull(a#16L), coalesce(b#17L, 0), isnull(b#17L), 5), ENSURE_REQUIREMENTS,
> [id=#67]
> : +- HashAggregate(keys=[a#16L, b#17L], functions=[])
> : +- Exchange hashpartitioning(a#16L, b#17L, 5),
> ENSURE_REQUIREMENTS, [id=#61]
> : +- HashAggregate(keys=[a#16L, b#17L], functions=[])
> : +- FileScan parquet default.t1[a#16L,b#17L]
> Batched: true, DataFilters: [], Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data...,
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<a:bigint,b:bigint>
> +- Sort [coalesce(c#18L, 0) ASC NULLS FIRST, isnull(c#18L) ASC
> NULLS FIRST, coalesce(d#19L, 0) ASC NULLS FIRST, isnull(d#19L) ASC NULLS
> FIRST], false, 0
> +- Exchange hashpartitioning(coalesce(c#18L, 0),
> isnull(c#18L), coalesce(d#19L, 0), isnull(d#19L), 5), ENSURE_REQUIREMENTS,
> [id=#68]
> +- HashAggregate(keys=[c#18L, d#19L], functions=[])
> +- Exchange hashpartitioning(c#18L, d#19L, 5),
> ENSURE_REQUIREMENTS, [id=#63]
> +- HashAggregate(keys=[c#18L, d#19L], functions=[])
> +- FileScan parquet default.t2[c#18L,d#19L]
> Batched: true, DataFilters: [], Format: Parquet, Location:
> InMemoryFileIndex[file:/Users/yumwang/spark/spark-warehouse/org.apache.spark.sql.Data...,
> PartitionFilters: [], PushedFilters: [], ReadSchema:
> struct<c:bigint,d:bigint>
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]