Ala Luszczak commented on SPARK-23496:

I agree that this solution is merely making the problem unlikely to occur, 
instead of really solving it.

But the code in {{DefaultPartitionsCoalescer.setGroups()}} ([see 
 is deliberately written so that it's fast (O(n log n) with respect to number 
of coalesced partitions, which is assumed order of magnitude smaller than the 
number of input partitions), but not necessarily accurate. The same applies to 
other algorithms there.

Enforcing an even data distribution is not trivial. For example:
 * We merely look at the number of partitions, not on the number of rows in 
each of the partitions. There might be a severe skew across the partitions to 
begin with.
 * It's not clear how to treat partitions with multiple preferred location.
 * It's not clear if it's more important for every input location to find some 
matching coalesced partition, or if it's more important to keep the partition 
size even.
 * It's not clear how best to deal with a mix of partitions with and without 
locality preferences.

I think it's better to have a very simple fix that will work well vast majority 
of the time now, and maybe have a follow-up ticked for revisiting the design of 
{{DefaultPartitionCoalescer}} for later.

> Locality of coalesced partitions can be severely skewed by the order of input 
> partitions
> ----------------------------------------------------------------------------------------
>                 Key: SPARK-23496
>                 URL: https://issues.apache.org/jira/browse/SPARK-23496
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 3.0.0
>            Reporter: Ala Luszczak
>            Priority: Major
> Example:
> Consider RDD "R" with 100 partitions, half of which have locality preference 
> "hostA" and half have "hostB".
>  * Assume odd-numbered input partitions of R prefer "hostA" and even-numbered 
> prefer "hostB". Then R.coalesce(50) will have 25 partitions with preference 
> "hostA" and 25 with "hostB" (even distribution).
>  * Assume partitions with index 0-49 of R prefer "hostA" and partitions with 
> index 50-99 prefer "hostB". Then R.coalesce(50) will have 49 partitions with 
> "hostA" and 1 with "hostB" (extremely skewed distribution).
> The algorithm in {{DefaultPartitionCoalescer.setupGroups}} is responsible for 
> picking preferred locations for coalesced partitions. It analyzes the 
> preferred locations of input partitions. It starts by trying to create one 
> partition for each unique location in the input. However, if the the 
> requested number of coalesced partitions is higher that the number of unique 
> locations, it has to pick duplicate locations.
> Currently, the duplicate locations are picked by iterating over the input 
> partitions in order, and copying their preferred locations to coalesced 
> partitions. If the input partitions are clustered by location, this can 
> result in severe skew.
> Instead of iterating over the list of input partitions in order, we should 
> pick them at random.

This message was sent by Atlassian JIRA

To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to