Bobby Wang created SPARK-40407:
----------------------------------

             Summary: Repartition of DataFrame can result in severe data skew 
in some special case
                 Key: SPARK-40407
                 URL: https://issues.apache.org/jira/browse/SPARK-40407
             Project: Spark
          Issue Type: Improvement
          Components: SQL
    Affects Versions: 3.2.2, 3.3.0, 3.2.1, 3.1.3, 3.1.2, 3.1.1, 3.0.1
            Reporter: Bobby Wang


_val df = spark.range(0, 100, 1, 50).repartition(4)_
_val v = df.rdd.mapPartitions { iter => {_
        _Iterator.single(iter.length)_
{_}}{_}{_}.collect(){_}
_println(v.mkString(","))_


The above simple code outputs `50,0,0,50`, which means there is no data in 
partition 1 and partition 2.

 

I just debugged it and found the RoundRobin seems to ensure to distribute the 
records evenly **in the same partition**, and not guarantee it between 
partitions.


Below is the code to generate the key

``` scala
      case RoundRobinPartitioning(numPartitions) =>
        // Distributes elements evenly across output partitions, starting from 
a random partition.
        var position = new 
Random(TaskContext.get().partitionId()).nextInt(numPartitions)  
        (row: InternalRow) => {
          // The HashPartitioner will handle the `mod` by the number of 
partitions
          position += 1
          position
        }
```

In this case, There are 50 partitions, each partition will only compute 2 
elements. The issue for RoundRobin here is it always starts with *position=2* 
to do the Roundrobin.

See the output of Random
``` scala
scala> (1 to 200).foreach(partitionId => print(new 
Random(partitionId).nextInt(4) + " "))  // the position is always 2.
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 
2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 
```

Similarly, the below Random code also outputs the same value, 

 

(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + " 
"))

(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + " 
"))

(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + " 
"))

(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + " 
"))

(1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + " 
"))

 

Let's go back to this case,

Consider partition 0, the total elements are [0, 1], so when shuffle writes, 
for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, 
the key will be (position + 1)=(3+1)=4%4 = 0
consider partition 1, the total elements are [2, 3], so when shuffle writes, 
for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, 
the key will be (position + 1)=(3+1)=4%4 = 0

 

The calculation is also applied for other left partitions since the starting 
position is always 2 for this case.

So, as you can see, each partition will write its elements to Partition [0, 3], 
which results in Partition [1, 2] without any data.

 

I will try to provide the patch to fix this issue.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to