[
https://issues.apache.org/jira/browse/SPARK-31794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17810476#comment-17810476
]
Sean R. Owen commented on SPARK-31794:
--------------------------------------
Not that it helps, but I observe the same behavior, and it must be a bug. The
result is exactly as expected, except that the first partition is 2x the size
of the others, and the last partition is empty. I tried tweaks to identify the
problem (i.e. what if I increased the desired partition count by 1? added 1 to
my indices?) but didn't help.
I don't have any fix or further insight, just adding that yeah this seems to be
a problem
> Incorrect distribution with repartitionByRange and repartition column
> expression
> --------------------------------------------------------------------------------
>
> Key: SPARK-31794
> URL: https://issues.apache.org/jira/browse/SPARK-31794
> Project: Spark
> Issue Type: Bug
> Components: Spark Core
> Affects Versions: 2.3.2, 2.4.5, 3.0.1
> Environment: Sample code for obtaining the above test results.
> import java.io.File
> import java.io.PrintWriter
> val logfile="/tmp/sparkdftest.log"
> val writer = new PrintWriter(logfile)
> writer.println("Spark Version " + sc.version)
> val df= Range(1, 1002).toDF("val")
> writer.println("Default Partition Length:" + df.rdd.partitions.length)
> writer.println("Default Partition getNumPartitions:" +
> df.rdd.getNumPartitions)
> writer.println("Default Partition groupBy spark_partition_id:" +
> df.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfcount=df.mapPartitions\{part => Iterator(part.size)}
> writer.println("Default Partition:" + dfcount.collect().toList)
> val numparts=24
> val dfparts_range=df.withColumn("partid", $"val" %
> numparts).repartitionByRange(numparts, $"partid")
> writer.println("repartitionByRange Length:" +
> dfparts_range.rdd.partitions.length)
> writer.println("repartitionByRange getNumPartitions:" +
> dfparts_range.rdd.getNumPartitions)
> writer.println("repartitionByRange groupBy spark_partition_id:" +
> dfparts_range.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfpartscount=dfparts_range.mapPartitions\{part => Iterator(part.size)}
> writer.println("repartitionByRange: " + dfpartscount.collect().toList)
> val dfparts_expr=df.withColumn("partid", $"val" %
> numparts).repartition(numparts, $"partid")
> writer.println("repartition by column expr Length:" +
> dfparts_expr.rdd.partitions.length)
> writer.println("repartition by column expr getNumPartitions:" +
> dfparts_expr.rdd.getNumPartitions)
> writer.println("repartition by column expr groupBy spark_partitoin_id:" +
> dfparts_expr.groupBy(spark_partition_id).count().rdd.partitions.length)
> val dfpartscount=dfparts_expr.mapPartitions\{part => Iterator(part.size)}
> writer.println("repartition by column expr:" + dfpartscount.collect().toList)
> writer.close()
> Reporter: Ramesha Bhatta
> Priority: Major
> Labels: performance
>
> Both repartitionByRange and repartition(<num>, <column>) resulting in wrong
> distribution within the resulting partition.
>
> In the Range partition one of the partition has 2x volume and last one with
> zero. In repartition this is more problematic with some partition with 4x,
> 2x the avg and many partitions with zero volume.
>
> This distribution imbalance can cause performance problem in a concurrent
> environment.
> Details from testing in 3 different versions.
> |Verion 2.3.2|Version 2.4.5|Versoin 3.0 Preview2|
> |Spark Version 2.3.2.3.1.4.0-315|Spark Version 2.4.5|Spark Version
> 3.0.0-preview2|
> |Default Partition Length:2|Default Partition Length:2|Default Partition
> Length:80|
> |Default Partition getNumPartitions:2|Default Partition
> getNumPartitions:2|Default Partition getNumPartitions:80|
> |Default Partition groupBy spark_partition_id:200|Default Partition groupBy
> spark_partition_id:200|Default Partition groupBy spark_partition_id:200|
> |repartitionByRange Length:24|repartitionByRange Length:24|repartitionByRange
> Length:24|
> |repartitionByRange getNumPartitions:24|repartitionByRange
> getNumPartitions:24|repartitionByRange getNumPartitions:24|
> |repartitionByRange groupBy spark_partition_id:200|repartitionByRange groupBy
> spark_partition_id:200|repartitionByRange groupBy spark_partition_id:200|
> |repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42,
> 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|repartitionByRange: List(83, 42,
> 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41,
> 41, 41, 0)|repartitionByRange: List(83, 42, 42, 42, 42, 42, 42, 42, 42, 42,
> 42, 42, 42, 42, 42, 42, 42, 41, 41, 41, 41, 41, 41, 0)|
> |repartition by column expr Length:24|repartition by column expr
> Length:24|repartition by column expr Length:24|
> |repartition by column expr getNumPartitions:24|repartition by column expr
> getNumPartitions:24|repartition by column expr getNumPartitions:24|
> |repartition by column expr groupBy spark_partitoin_id:200|repartition by
> column expr groupBy spark_partitoin_id:200|repartition by column expr groupBy
> spark_partitoin_id:200|
> |repartition by column expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42,
> 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|repartition by column
> expr:List(83, 42, 0, 84, 0, 42, 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83,
> 84, 42, 0, 0, 0, 0)|repartition by column expr:List(83, 42, 0, 84, 0, 42,
> 125, 0, 42, 84, 0, 42, 0, 82, 0, 124, 42, 83, 84, 42, 0, 0, 0, 0)|
--
This message was sent by Atlassian Jira
(v8.20.10#820010)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]