Repository: spark Updated Branches: refs/heads/master 29372b631 -> a6d7b61f9
[SPARK-4479][SQL] Avoids unnecessary defensive copies when sort based shuffle is on This PR is a workaround for SPARK-4479. Two changes are introduced: when merge sort is bypassed in `ExternalSorter`, 1. also bypass RDD elements buffering as buffering is the reason that `MutableRow` backed row objects must be copied, and 2. avoids defensive copies in `Exchange` operator <!-- Reviewable:start --> [<img src="https://reviewable.io/review_button.png" height=40 alt="Review on Reviewable"/>](https://reviewable.io/reviews/apache/spark/3422) <!-- Reviewable:end --> Author: Cheng Lian <[email protected]> Closes #3422 from liancheng/avoids-defensive-copies and squashes the following commits: 591f2e9 [Cheng Lian] Passes all shuffle suites 0c3c91e [Cheng Lian] Fixes shuffle write metrics when merge sort is bypassed ed5df3c [Cheng Lian] Fixes styling changes f75089b [Cheng Lian] Avoids unnecessary defensive copies when sort based shuffle is on Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a6d7b61f Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a6d7b61f Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a6d7b61f Branch: refs/heads/master Commit: a6d7b61f92dc7c1f9632cecb232afa8040ab2b4d Parents: 29372b6 Author: Cheng Lian <[email protected]> Authored: Mon Nov 24 12:43:45 2014 -0800 Committer: Michael Armbrust <[email protected]> Committed: Mon Nov 24 12:43:45 2014 -0800 ---------------------------------------------------------------------- .../spark/util/collection/ExternalSorter.scala | 23 +++++++++++++++++--- .../scala/org/apache/spark/ShuffleSuite.scala | 12 +++++----- .../apache/spark/sql/execution/Exchange.scala | 16 +++++++++++++- 3 files changed, 41 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/a6d7b61f/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala index c617ff5..15bda1c 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalSorter.scala @@ -205,6 +205,13 @@ private[spark] class ExternalSorter[K, V, C]( map.changeValue((getPartition(kv._1), kv._1), update) maybeSpillCollection(usingMap = true) } + } else if (bypassMergeSort) { + // SPARK-4479: Also bypass buffering if merge sort is bypassed to avoid defensive copies + if (records.hasNext) { + spillToPartitionFiles(records.map { kv => + ((getPartition(kv._1), kv._1), kv._2.asInstanceOf[C]) + }) + } } else { // Stick values into our buffer while (records.hasNext) { @@ -336,6 +343,10 @@ private[spark] class ExternalSorter[K, V, C]( * @param collection whichever collection we're using (map or buffer) */ private def spillToPartitionFiles(collection: SizeTrackingPairCollection[(Int, K), C]): Unit = { + spillToPartitionFiles(collection.iterator) + } + + private def spillToPartitionFiles(iterator: Iterator[((Int, K), C)]): Unit = { assert(bypassMergeSort) // Create our file writers if we haven't done so yet @@ -350,9 +361,9 @@ private[spark] class ExternalSorter[K, V, C]( } } - val it = collection.iterator // No need to sort stuff, just write each element out - while (it.hasNext) { - val elem = it.next() + // No need to sort stuff, just write each element out + while (iterator.hasNext) { + val elem = iterator.next() val partitionId = elem._1._1 val key = elem._1._2 val value = elem._2 @@ -748,6 +759,12 @@ private[spark] class ExternalSorter[K, V, C]( context.taskMetrics.memoryBytesSpilled += memoryBytesSpilled context.taskMetrics.diskBytesSpilled += diskBytesSpilled + context.taskMetrics.shuffleWriteMetrics.filter(_ => bypassMergeSort).foreach { m => + if (curWriteMetrics != null) { + m.shuffleBytesWritten += curWriteMetrics.shuffleBytesWritten + m.shuffleWriteTime += curWriteMetrics.shuffleWriteTime + } + } lengths } http://git-wip-us.apache.org/repos/asf/spark/blob/a6d7b61f/core/src/test/scala/org/apache/spark/ShuffleSuite.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala index cda942e..85e5f9a 100644 --- a/core/src/test/scala/org/apache/spark/ShuffleSuite.scala +++ b/core/src/test/scala/org/apache/spark/ShuffleSuite.scala @@ -95,14 +95,14 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,512]", "test", conf) - // 10 partitions from 4 keys - val NUM_BLOCKS = 10 + // 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys + val NUM_BLOCKS = 201 val a = sc.parallelize(1 to 4, NUM_BLOCKS) val b = a.map(x => (x, x*2)) // NOTE: The default Java serializer doesn't create zero-sized blocks. // So, use Kryo - val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(10)) + val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS)) .setSerializer(new KryoSerializer(conf)) val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId @@ -122,13 +122,13 @@ abstract class ShuffleSuite extends FunSuite with Matchers with LocalSparkContex // Use a local cluster with 2 processes to make sure there are both local and remote blocks sc = new SparkContext("local-cluster[2,1,512]", "test", conf) - // 10 partitions from 4 keys - val NUM_BLOCKS = 10 + // 201 partitions (greater than "spark.shuffle.sort.bypassMergeThreshold") from 4 keys + val NUM_BLOCKS = 201 val a = sc.parallelize(1 to 4, NUM_BLOCKS) val b = a.map(x => (x, x*2)) // NOTE: The default Java serializer should create zero-sized blocks - val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(10)) + val c = new ShuffledRDD[Int, Int, Int](b, new HashPartitioner(NUM_BLOCKS)) val shuffleId = c.dependencies.head.asInstanceOf[ShuffleDependency[_, _, _]].shuffleId assert(c.count === 4) http://git-wip-us.apache.org/repos/asf/spark/blob/a6d7b61f/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala ---------------------------------------------------------------------- diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala index cff7a01..d7c811c 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala @@ -41,11 +41,21 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una /** We must copy rows when sort based shuffle is on */ protected def sortBasedShuffleOn = SparkEnv.get.shuffleManager.isInstanceOf[SortShuffleManager] + private val bypassMergeThreshold = + child.sqlContext.sparkContext.conf.getInt("spark.shuffle.sort.bypassMergeThreshold", 200) + override def execute() = attachTree(this , "execute") { newPartitioning match { case HashPartitioning(expressions, numPartitions) => // TODO: Eliminate redundant expressions in grouping key and value. - val rdd = if (sortBasedShuffleOn) { + // This is a workaround for SPARK-4479. When: + // 1. sort based shuffle is on, and + // 2. the partition number is under the merge threshold, and + // 3. no ordering is required + // we can avoid the defensive copies to improve performance. In the long run, we probably + // want to include information in shuffle dependencies to indicate whether elements in the + // source RDD should be copied. + val rdd = if (sortBasedShuffleOn && numPartitions > bypassMergeThreshold) { child.execute().mapPartitions { iter => val hashExpressions = newMutableProjection(expressions, child.output)() iter.map(r => (hashExpressions(r).copy(), r.copy())) @@ -82,6 +92,10 @@ case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends Una shuffled.map(_._1) case SinglePartition => + // SPARK-4479: Can't turn off defensive copy as what we do for `HashPartitioning`, since + // operators like `TakeOrdered` may require an ordering within the partition, and currently + // `SinglePartition` doesn't include ordering information. + // TODO Add `SingleOrderedPartition` for operators like `TakeOrdered` val rdd = if (sortBasedShuffleOn) { child.execute().mapPartitions { iter => iter.map(r => (null, r.copy())) } } else { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
