[
https://issues.apache.org/jira/browse/SPARK-40407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Erik Krogen updated SPARK-40407:
--------------------------------
Description:
{code:scala}
_val df = spark.range(0, 100, 1, 50).repartition(4)_
_val v = df.rdd.mapPartitions { iter => {_
_Iterator.single(iter.length)_
{_}}{_}{_}.collect(){_}
_println(v.mkString(","))_
{code}
The above simple code outputs `50,0,0,50`, which means there is no data in
partition 1 and partition 2.
I just debugged it and found the RoundRobin seems to ensure to distribute the
records evenly **in the same partition**, and not guarantee it between
partitions.
Below is the code to generate the key
{code:scala}
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from
a random partition.
var position = new
Random(TaskContext.get().partitionId()).nextInt(numPartitions)
(row: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of
partitions
position += 1
position
}
{code}
In this case, There are 50 partitions, each partition will only compute 2
elements. The issue for RoundRobin here is it always starts with *position=2*
to do the Roundrobin.
See the output of Random
{code:scala}
scala> (1 to 200).foreach(partitionId => print(new
Random(partitionId).nextInt(4) + " ")) // the position is always 2.
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
{code}
Similarly, the below Random code also outputs the same value,
{code:scala}
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + "
"))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + "
"))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + "
"))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + "
"))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + "
"))
{code}
Let's go back to this case,
Consider partition 0, the total elements are [0, 1], so when shuffle writes,
for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1,
the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes,
for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3,
the key will be (position + 1)=(3+1)=4%4 = 0
The calculation is also applied for other left partitions since the starting
position is always 2 for this case.
So, as you can see, each partition will write its elements to Partition [0, 3],
which results in Partition [1, 2] without any data.
I will try to provide the patch to fix this issue.
was:
_val df = spark.range(0, 100, 1, 50).repartition(4)_
_val v = df.rdd.mapPartitions { iter => {_
_Iterator.single(iter.length)_
{_}}{_}{_}.collect(){_}
_println(v.mkString(","))_
The above simple code outputs `50,0,0,50`, which means there is no data in
partition 1 and partition 2.
I just debugged it and found the RoundRobin seems to ensure to distribute the
records evenly **in the same partition**, and not guarantee it between
partitions.
Below is the code to generate the key
``` scala
case RoundRobinPartitioning(numPartitions) =>
// Distributes elements evenly across output partitions, starting from
a random partition.
var position = new
Random(TaskContext.get().partitionId()).nextInt(numPartitions)
(row: InternalRow) => {
// The HashPartitioner will handle the `mod` by the number of
partitions
position += 1
position
}
```
In this case, There are 50 partitions, each partition will only compute 2
elements. The issue for RoundRobin here is it always starts with *position=2*
to do the Roundrobin.
See the output of Random
``` scala
scala> (1 to 200).foreach(partitionId => print(new
Random(partitionId).nextInt(4) + " ")) // the position is always 2.
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
```
Similarly, the below Random code also outputs the same value,
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + "
"))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + "
"))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + "
"))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + "
"))
(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + "
"))
Let's go back to this case,
Consider partition 0, the total elements are [0, 1], so when shuffle writes,
for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1,
the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes,
for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3,
the key will be (position + 1)=(3+1)=4%4 = 0
The calculation is also applied for other left partitions since the starting
position is always 2 for this case.
So, as you can see, each partition will write its elements to Partition [0, 3],
which results in Partition [1, 2] without any data.
I will try to provide the patch to fix this issue.
> Repartition of DataFrame can result in severe data skew in some special case
> ----------------------------------------------------------------------------
>
> Key: SPARK-40407
> URL: https://issues.apache.org/jira/browse/SPARK-40407
> Project: Spark
> Issue Type: Improvement
> Components: SQL
> Affects Versions: 3.0.1, 3.1.1, 3.1.2, 3.1.3, 3.2.1, 3.3.0, 3.2.2
> Reporter: Bobby Wang
> Priority: Major
>
> {code:scala}
> _val df = spark.range(0, 100, 1, 50).repartition(4)_
> _val v = df.rdd.mapPartitions { iter => {_
> _Iterator.single(iter.length)_
> {_}}{_}{_}.collect(){_}
> _println(v.mkString(","))_
> {code}
> The above simple code outputs `50,0,0,50`, which means there is no data in
> partition 1 and partition 2.
> I just debugged it and found the RoundRobin seems to ensure to distribute the
> records evenly **in the same partition**, and not guarantee it between
> partitions.
> Below is the code to generate the key
> {code:scala}
> case RoundRobinPartitioning(numPartitions) =>
> // Distributes elements evenly across output partitions, starting
> from a random partition.
> var position = new
> Random(TaskContext.get().partitionId()).nextInt(numPartitions)
> (row: InternalRow) => {
> // The HashPartitioner will handle the `mod` by the number of
> partitions
> position += 1
> position
> }
> {code}
> In this case, There are 50 partitions, each partition will only compute 2
> elements. The issue for RoundRobin here is it always starts with *position=2*
> to do the Roundrobin.
> See the output of Random
> {code:scala}
> scala> (1 to 200).foreach(partitionId => print(new
> Random(partitionId).nextInt(4) + " ")) // the position is always 2.
> 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
> 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
> 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
> 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
> 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2
> 2 2 2 2 2
> {code}
> Similarly, the below Random code also outputs the same value,
> {code:scala}
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) +
> " "))
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) +
> " "))
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) +
> " "))
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) +
> " "))
> (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) +
> " "))
> {code}
> Let's go back to this case,
> Consider partition 0, the total elements are [0, 1], so when shuffle writes,
> for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1,
> the key will be (position + 1)=(3+1)=4%4 = 0
> consider partition 1, the total elements are [2, 3], so when shuffle writes,
> for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3,
> the key will be (position + 1)=(3+1)=4%4 = 0
>
> The calculation is also applied for other left partitions since the starting
> position is always 2 for this case.
> So, as you can see, each partition will write its elements to Partition [0,
> 3], which results in Partition [1, 2] without any data.
>
> I will try to provide the patch to fix this issue.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]