Repository: spark Updated Branches: refs/heads/branch-2.3 2858eaafa -> a81ace196
[SPARK-23207][SQL][FOLLOW-UP] Don't perform local sort for DataFrame.repartition(1) ## What changes were proposed in this pull request? In `ShuffleExchangeExec`, we don't need to insert extra local sort before round-robin partitioning, if the new partitioning has only 1 partition, because under that case all output rows go to the same partition. ## How was this patch tested? The existing test cases. Author: Xingbo Jiang <[email protected]> Closes #20426 from jiangxb1987/repartition1. (cherry picked from commit b375397b1678b7fe20a0b7f87a7e8b37ae5646ef) Signed-off-by: Wenchen Fan <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a81ace19 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a81ace19 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a81ace19 Branch: refs/heads/branch-2.3 Commit: a81ace196ec41b1b3f739294c965df270ee2ddd2 Parents: 2858eaa Author: Xingbo Jiang <[email protected]> Authored: Tue Jan 30 11:40:42 2018 +0800 Committer: Wenchen Fan <[email protected]> Committed: Tue Jan 30 11:41:21 2018 +0800 ---------------------------------------------------------------------- .../spark/sql/execution/exchange/ShuffleExchangeExec.scala | 4 ++++ .../apache/spark/sql/execution/streaming/ForeachSinkSuite.scala | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a81ace19/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala index 76c1fa6..4d95ee3 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/exchange/ShuffleExchangeExec.scala @@ -257,7 +257,11 @@ object ShuffleExchangeExec { // // Currently we following the most straight-forward way that perform a local sort before // partitioning. + // + // Note that we don't perform local sort if the new partitioning has only 1 partition, under + // that case all output rows go to the same partition. val newRdd = if (SQLConf.get.sortBeforeRepartition && + newPartitioning.numPartitions > 1 && newPartitioning.isInstanceOf[RoundRobinPartitioning]) { rdd.mapPartitionsInternal { iter => val recordComparatorSupplier = new Supplier[RecordComparator] { http://git-wip-us.apache.org/repos/asf/spark/blob/a81ace19/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala index 1248c67..41434e6 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/ForeachSinkSuite.scala @@ -162,7 +162,7 @@ class ForeachSinkSuite extends StreamTest with SharedSQLContext with BeforeAndAf val allEvents = ForeachSinkSuite.allEvents() assert(allEvents.size === 1) assert(allEvents(0)(0) === ForeachSinkSuite.Open(partition = 0, version = 0)) - assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 2)) + assert(allEvents(0)(1) === ForeachSinkSuite.Process(value = 1)) // `close` should be called with the error val errorEvent = allEvents(0)(2).asInstanceOf[ForeachSinkSuite.Close] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
