[ https://issues.apache.org/jira/browse/SPARK-40407?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Wenchen Fan reassigned SPARK-40407: ----------------------------------- Assignee: Bobby Wang > Repartition of DataFrame can result in severe data skew in some special case > ---------------------------------------------------------------------------- > > Key: SPARK-40407 > URL: https://issues.apache.org/jira/browse/SPARK-40407 > Project: Spark > Issue Type: Improvement > Components: SQL > Affects Versions: 3.0.1, 3.1.1, 3.1.2, 3.1.3, 3.2.1, 3.3.0, 3.2.2 > Reporter: Bobby Wang > Assignee: Bobby Wang > Priority: Major > Fix For: 3.4.0, 3.3.1, 3.2.3 > > > {code:scala} > _val df = spark.range(0, 100, 1, 50).repartition(4)_ > _val v = df.rdd.mapPartitions { iter => {_ > _Iterator.single(iter.length)_ > {_}}{_}{_}.collect(){_} > _println(v.mkString(","))_ > {code} > The above simple code outputs `50,0,0,50`, which means there is no data in > partition 1 and partition 2. > I just debugged it and found the RoundRobin seems to ensure to distribute the > records evenly **in the same partition**, and not guarantee it between > partitions. > Below is the code to generate the key > {code:scala} > case RoundRobinPartitioning(numPartitions) => > // Distributes elements evenly across output partitions, starting > from a random partition. > var position = new > Random(TaskContext.get().partitionId()).nextInt(numPartitions) > (row: InternalRow) => { > // The HashPartitioner will handle the `mod` by the number of > partitions > position += 1 > position > } > {code} > In this case, There are 50 partitions, each partition will only compute 2 > elements. The issue for RoundRobin here is it always starts with *position=2* > to do the Roundrobin. > See the output of Random > {code:scala} > scala> (1 to 200).foreach(partitionId => print(new > Random(partitionId).nextInt(4) + " ")) // the position is always 2. > 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 > 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 > 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 > 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 > 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 2 > 2 2 2 2 2 > {code} > Similarly, the below Random code also outputs the same value, > {code:scala} > (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(2) + > " ")) > (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(4) + > " ")) > (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(8) + > " ")) > (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(16) + > " ")) > (1 to 200).foreach(partitionId => print(new Random(partitionId).nextInt(32) + > " ")) > {code} > Let's go back to this case, > Consider partition 0, the total elements are [0, 1], so when shuffle writes, > for element 0, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 1, > the key will be (position + 1)=(3+1)=4%4 = 0 > consider partition 1, the total elements are [2, 3], so when shuffle writes, > for element 2, the key will be (position + 1) = 2 + 1 = 3%4=3, the element 3, > the key will be (position + 1)=(3+1)=4%4 = 0 > > The calculation is also applied for other left partitions since the starting > position is always 2 for this case. > So, as you can see, each partition will write its elements to Partition [0, > 3], which results in Partition [1, 2] without any data. > > I will try to provide the patch to fix this issue. -- This message was sent by Atlassian Jira (v8.20.10#820010) --------------------------------------------------------------------- To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org For additional commands, e-mail: issues-h...@spark.apache.org