Ramesha Bhatta created SPARK-31794:
--------------------------------------

             Summary: Incorrect distribution with repartitionByRange and 
repartition column expression
                 Key: SPARK-31794
                 URL: https://issues.apache.org/jira/browse/SPARK-31794
             Project: Spark
          Issue Type: Bug
          Components: Spark Core
    Affects Versions: 2.4.5, 2.3.2, 3.0.1
         Environment: Sample code for obtaining the above test results.

import java.io.File 
import java.io.PrintWriter 
val logfile="/tmp/sparkdftest.log"
val writer = new PrintWriter(logfile) 
writer.println("Spark Version " + sc.version)
val df= Range(1, 1002).toDF("val")
writer.println("Default Partition Length:" + df.rdd.partitions.length)
writer.println("Default Partition getNumPartitions:" + df.rdd.getNumPartitions)
writer.println("Default Partition groupBy spark_partition_id:" + 
df.groupBy(spark_partition_id).count().rdd.partitions.length)
val dfcount=df.mapPartitions\{part => Iterator(part.size)}
writer.println("Default Partition:" + dfcount.collect().toList)
val numparts=24
val dfparts_range=df.withColumn("partid", $"val" % 
numparts).repartitionByRange(numparts, $"partid")
writer.println("repartitionByRange Length:" + 
dfparts_range.rdd.partitions.length)
writer.println("repartitionByRange getNumPartitions:" + 
dfparts_range.rdd.getNumPartitions)
writer.println("repartitionByRange groupBy spark_partition_id:" + 
dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length)
val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)}
writer.println("repartitionByRange: " + dfpartscount.collect().toList)

val dfparts_expr=df.withColumn("partid", $"val" % 
numparts).repartition(numparts, $"partid")
writer.println("repartition by column expr Length:" + 
dfparts_expr.rdd.partitions.length)
writer.println("repartition by column expr getNumPartitions:" + 
dfparts_expr.rdd.getNumPartitions)
writer.println("repartition by column expr groupBy spark_partitoin_id:" + 
dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length)
val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)}
writer.println("repartition by column expr:" + dfpartscount.collect().toList)
writer.close()
            Reporter: Ramesha Bhatta


Both repartitionByRange and  repartition(<num>, <column>)  resulting in wrong 
distribution within the resulting partition.  

 

In the Range partition one of the partition has 2x volume and last one with 
zero.  In repartition this is more problematic with some partition with 4x, 2x 
the avg and many partitions with zero volume.  

 

This distribution imbalance can cause performance problem in a concurrent 
environment.

Details from testing in 3 different versions.
|Verion 2.3.2|Version 2.4.5|Versoin 3.0 Preview2|
|Spark Version 2.3.2.3.1.4.0-315|Spark Version 2.4.5|Spark Version 
3.0.0-preview2|
|Default Partition Length:2|Default Partition Length:2|Default Partition 
Length:80|
|Default Partition getNumPartitions:2|Default Partition 
getNumPartitions:2|Default Partition getNumPartitions:80|
|Default Partition groupBy spark_partition_id:200|Default Partition groupBy 
spark_partition_id:200|Default Partition groupBy spark_partition_id:200|
|repartitionByRange Length:24|repartitionByRange Length:24|repartitionByRange 
Length:24|
|repartitionByRange getNumPartitions:24|repartitionByRange 
getNumPartitions:24|repartitionByRange getNumPartitions:24|
|repartitionByRange groupBy spark_partition_id:200|repartitionByRange groupBy 
spark_partition_id:200|repartitionByRange groupBy spark_partition_id:200|
|repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 
42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|repartitionByRange: List(83, 42, 42, 
42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 
0)|repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 
42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|
|repartition by column expr Length:24|repartition by column expr 
Length:24|repartition by column expr Length:24|
|repartition by column expr getNumPartitions:24|repartition by column expr 
getNumPartitions:24|repartition by column expr getNumPartitions:24|
|repartition by column expr groupBy spark_partitoin_id:200|repartition by 
column expr groupBy spark_partitoin_id:200|repartition by column expr groupBy 
spark_partitoin_id:200|
|repartition by column expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, 
0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|repartition by column expr:List(83, 
42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 
0, 0)|repartition by column expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 
42, 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscr...@spark.apache.org
For additional commands, e-mail: issues-h...@spark.apache.org

Reply via email to