[ 
https://issues.apache.org/jira/browse/SPARK-40703?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615255#comment-17615255
 ] 

Chao Sun commented on SPARK-40703:
----------------------------------

Thanks [~bryanck] . Now I see where the issue is.

In your pyspark example, one side reports {{UnknownPartitioning}} while another 
side reports {{{}SinglePartition{}}}. Later on, Spark will insert shuffle for 
{{UnknownPartitioning}} so it becomes {{{}HashPartitioning{}}}. In this 
particular case, when Spark is deciding which side to insert shuffle, it'll 
pick the {{HashPartitioning}} again and convert it into the same 
{{HashPartitioning}} but with {{{}numPartitions = 1{}}}.

Before:
{code}
 ShuffleExchange(HashPartition(200))  <-->  SinglePartition
{code}
(suppose {{spark.sql.shuffle.partitions}} is 200)

After:
{code}
 ShuffleExchange(HashPartition(1))  <-->  SinglePartition
{code}
 
The reason Spark chooses to do in this way is because there is a trade-off 
between shuffle cost and parallelism. At the moment, when Spark sees that one 
side of the join has {{ShuffleExchange}} (meaning it needs to be shuffled 
anyways), and the other side doesn't, it'll try to avoid shuffling the other 
side. 

This makes more sense if we have:
{code}
ShuffleExchange(HashPartition(200)) <-> HashPartition(150)
{code}

as in this case, Spark will avoid shuffle the right hand side and instead just 
change the number of shuffle partitions on the left:
{code}
ShuffleExchange(HashPartition(150) <-> HashPartition(150)
{code}

I feel we can treat the {{SinglePartition}} as a special case here. Let me see 
if I can come up with a PR.

> Performance regression for joins in Spark 3.3 vs Spark 3.2
> ----------------------------------------------------------
>
>                 Key: SPARK-40703
>                 URL: https://issues.apache.org/jira/browse/SPARK-40703
>             Project: Spark
>          Issue Type: Bug
>          Components: SQL
>    Affects Versions: 3.3.0
>            Reporter: Bryan Keller
>            Priority: Major
>         Attachments: spark32-plan.txt, spark33-plan.txt, test.py
>
>
> When running the TPC-DS benchmarks using a DSv2 datasource in Spark 3.3, a 
> performance regression vs Spark 3.2 was discovered. More specifically, it 
> appears as if {_}EnsureRequirements.ensureDistributionAndOrdering{_}() no 
> longer enforces a minimum number of partitions for a join distribution in 
> some cases. This impacts DSv2 datasources, because if a scan has only a 
> single read partition {_}DataSourceV2ScanExecBase.outputPartitioning{_}() 
> returns a _SinglePartition_ instance. The _SinglePartition_ creates a 
> {_}SinglePartitionShuffleSpec{_}, and 
> {_}SinglePartitionShuffleSpec.canCreatePartitioning{_}() returns true.
> Because {_}canCreatePartitioning{_}() returns true in this case, 
> {_}EnsureRequirements.ensureDistributionAndOrdering{_}() won't enforce 
> minimum parallelism and also will favor the single partition when considering 
> the best distribution candidate. Ultimately this results in a single 
> partition being selected for the join distribution, even if the other side of 
> the join is a large table with many partitions. This can seriously impact 
> performance of the join.
> Spark 3.2 enforces minimum parallelism differently in 
> {_}ensureDistributionAndOrdering{_}() and thus does not suffer from this 
> issue. It will shuffle both sides of the join to enforce parallelism.
> In the TPC-DS benchmark, some queries affected include 14a and 14b. This can 
> also be demonstrated using a simple query, for example:
> {{select ics.i_item_sk from catalog_sales cs join item ics on cs.cs_item_sk = 
> ics.i_item_sk}}
> ...where _item_ is a small table that is read into one partition, and 
> _catalog_sales_ is a large table. These tables are part of the TPC-DS but you 
> can create your own. Also, to demonstrate the issue, you may need to turn off 
> broadcast joins though that is not required for this issue to occur, it 
> happens when running the TPC-DS with broadcast setting at default.
> Attached is the plan for this query in Spark 3.2 and in Spark 3.3. The plan 
> shows how in Spark 3.2, the join parallelism of 200 is reached by inserting 
> an exchange after the item table scan. In Spark 3.3, no such exchange is 
> inserted and the join parallelism is 1.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to