Repository: spark Updated Branches: refs/heads/master f5af86ea7 -> 048197749
[SPARK-22144][SQL] ExchangeCoordinator combine the partitions of an 0 sized pre-shuffle to 0 ## What changes were proposed in this pull request? when the length of pre-shuffle's partitions is 0, the length of post-shuffle's partitions should be 0 instead of spark.sql.shuffle.partitions. ## How was this patch tested? ExchangeCoordinator converted a pre-shuffle that partitions is 0 to a post-shuffle that partitions is 0 instead of one that partitions is spark.sql.shuffle.partitions. Author: liutang123 <[email protected]> Closes #19364 from liutang123/SPARK-22144. Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/04819774 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/04819774 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/04819774 Branch: refs/heads/master Commit: 048197749ef990e4def1fcbf488f3ded38d95cae Parents: f5af86e Author: liutang123 <[email protected]> Authored: Mon Jun 11 17:48:07 2018 -0700 Committer: Wenchen Fan <[email protected]> Committed: Mon Jun 11 17:48:07 2018 -0700 ---------------------------------------------------------------------- .../spark/sql/execution/exchange/ExchangeCoordinator.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/04819774/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala index 78f11ca..051e610 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ExchangeCoordinator.scala @@ -232,16 +232,16 @@ class ExchangeCoordinator( // number of post-shuffle partitions. val partitionStartIndices = if (mapOutputStatistics.length == 0) { - None + Array.empty[Int] } else { - Some(estimatePartitionStartIndices(mapOutputStatistics)) + estimatePartitionStartIndices(mapOutputStatistics) } var k = 0 while (k < numExchanges) { val exchange = exchanges(k) val rdd = - exchange.preparePostShuffleRDD(shuffleDependencies(k), partitionStartIndices) + exchange.preparePostShuffleRDD(shuffleDependencies(k), Some(partitionStartIndices)) newPostShuffleRDDs.put(exchange, rdd) k += 1 --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
