This is an automated email from the ASF dual-hosted git repository.
srowen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 21db433 [SPARK-27070] Fix performance bug in DefaultPartitionCoalescer
21db433 is described below
commit 21db4336b08fcb93779d72ebbb0251f3a2d08934
Author: fitermay <[email protected]>
AuthorDate: Thu Mar 14 20:13:18 2019 -0500
[SPARK-27070] Fix performance bug in DefaultPartitionCoalescer
When trying to coalesce a UnionRDD of two large FileScanRDDs
(each with a few million partitions) into around 8k partitions
the driver can stall for over an hour.
Profiler shows that over 90% of the time is spent in TimSort
which is invoked by `pickBin`. This patch replaces sorting with a more
efficient `min` for the purpose of finding the least occupied
PartitionGroup
Closes #23986 from fitermay/SPARK-27070.
Authored-by: fitermay <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
---
core/benchmarks/CoalescedRDDBenchmark-results.txt | 40 +++++++++++
.../scala/org/apache/spark/rdd/CoalescedRDD.scala | 34 ++++-----
.../apache/spark/rdd/CoalescedRDDBenchmark.scala | 80 ++++++++++++++++++++++
3 files changed, 137 insertions(+), 17 deletions(-)
diff --git a/core/benchmarks/CoalescedRDDBenchmark-results.txt
b/core/benchmarks/CoalescedRDDBenchmark-results.txt
new file mode 100644
index 0000000..dd63b0a
--- /dev/null
+++ b/core/benchmarks/CoalescedRDDBenchmark-results.txt
@@ -0,0 +1,40 @@
+================================================================================================
+Coalesced RDD , large scale
+================================================================================================
+
+Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Windows 10 10.0
+Intel64 Family 6 Model 63 Stepping 2, GenuineIntel
+Coalesced RDD: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
+------------------------------------------------------------------------------------------------------------------------
+Coalesce Num Partitions: 100 Num Hosts: 1 346 364
24 0.3 3458.9 1.0X
+Coalesce Num Partitions: 100 Num Hosts: 5 258 264
6 0.4 2579.0 1.3X
+Coalesce Num Partitions: 100 Num Hosts: 10 242 249
7 0.4 2415.2 1.4X
+Coalesce Num Partitions: 100 Num Hosts: 20 237 242
7 0.4 2371.7 1.5X
+Coalesce Num Partitions: 100 Num Hosts: 40 230 231
1 0.4 2299.8 1.5X
+Coalesce Num Partitions: 100 Num Hosts: 80 222 233
14 0.4 2223.0 1.6X
+Coalesce Num Partitions: 500 Num Hosts: 1 659 665
5 0.2 6590.4 0.5X
+Coalesce Num Partitions: 500 Num Hosts: 5 340 381
47 0.3 3395.2 1.0X
+Coalesce Num Partitions: 500 Num Hosts: 10 279 307
47 0.4 2788.3 1.2X
+Coalesce Num Partitions: 500 Num Hosts: 20 259 261
2 0.4 2591.9 1.3X
+Coalesce Num Partitions: 500 Num Hosts: 40 241 250
15 0.4 2406.5 1.4X
+Coalesce Num Partitions: 500 Num Hosts: 80 235 237
3 0.4 2349.9 1.5X
+Coalesce Num Partitions: 1000 Num Hosts: 1 1050 1053
4 0.1 10503.2 0.3X
+Coalesce Num Partitions: 1000 Num Hosts: 5 405 407
2 0.2 4049.5 0.9X
+Coalesce Num Partitions: 1000 Num Hosts: 10 320 322
2 0.3 3202.7 1.1X
+Coalesce Num Partitions: 1000 Num Hosts: 20 276 277
0 0.4 2762.3 1.3X
+Coalesce Num Partitions: 1000 Num Hosts: 40 257 260
5 0.4 2571.2 1.3X
+Coalesce Num Partitions: 1000 Num Hosts: 80 245 252
13 0.4 2448.9 1.4X
+Coalesce Num Partitions: 5000 Num Hosts: 1 3099 3145
55 0.0 30988.6 0.1X
+Coalesce Num Partitions: 5000 Num Hosts: 5 1037 1050
20 0.1 10374.4 0.3X
+Coalesce Num Partitions: 5000 Num Hosts: 10 626 633
8 0.2 6261.8 0.6X
+Coalesce Num Partitions: 5000 Num Hosts: 20 426 431
5 0.2 4258.6 0.8X
+Coalesce Num Partitions: 5000 Num Hosts: 40 328 341
22 0.3 3275.4 1.1X
+Coalesce Num Partitions: 5000 Num Hosts: 80 272 275
4 0.4 2721.4 1.3X
+Coalesce Num Partitions: 10000 Num Hosts: 1 5516 5526
9 0.0 55156.8 0.1X
+Coalesce Num Partitions: 10000 Num Hosts: 5 1956 1992
48 0.1 19560.9 0.2X
+Coalesce Num Partitions: 10000 Num Hosts: 10 1045 1057
18 0.1 10447.4 0.3X
+Coalesce Num Partitions: 10000 Num Hosts: 20 637 658
24 0.2 6373.2 0.5X
+Coalesce Num Partitions: 10000 Num Hosts: 40 431 448
15 0.2 4312.9 0.8X
+Coalesce Num Partitions: 10000 Num Hosts: 80 326 328
2 0.3 3263.4 1.1X
+
+
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 94e7d0b..e006f63 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -58,7 +58,7 @@ private[spark] case class CoalescedRDDPartition(
val parentPreferredLocations = rdd.context.getPreferredLocs(rdd,
p.index).map(_.host)
preferredLocation.exists(parentPreferredLocations.contains)
}
- if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
+ if (parents.isEmpty) 0.0 else loc.toDouble / parents.size.toDouble
}
}
@@ -91,7 +91,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
pc.coalesce(maxPartitions, prev).zipWithIndex.map {
case (pg, i) =>
val ids = pg.partitions.map(_.index).toArray
- new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
+ CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}
@@ -116,7 +116,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
/**
* Returns the preferred machine for the partition. If split is of type
CoalescedRDDPartition,
* then the preferred machine will be one which most parent splits prefer
too.
- * @param partition
+ * @param partition the partition for which to retrieve the preferred
machine, if exists
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -156,9 +156,11 @@ private[spark] class CoalescedRDD[T: ClassTag](
private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
extends PartitionCoalescer {
- def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean =
o1.numPartitions < o2.numPartitions
- def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean
=
- if (o1 == None) false else if (o2 == None) true else compare(o1.get,
o2.get)
+
+ implicit val partitionGroupOrdering: Ordering[PartitionGroup] =
+ (o1: PartitionGroup, o2: PartitionGroup) =>
+ java.lang.Integer.compare(o1.numPartitions, o2.numPartitions)
+
val rnd = new scala.util.Random(7919) // keep this class deterministic
@@ -178,7 +180,7 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}
- class PartitionLocations(prev: RDD[_]) {
+ private class PartitionLocations(prev: RDD[_]) {
// contains all the partitions from the previous RDD that don't have
preferred locations
val partsWithoutLocs = ArrayBuffer[Partition]()
@@ -213,15 +215,14 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
}
/**
- * Sorts and gets the least element of the list associated with key in
groupHash
+ * Gets the least element of the list associated with key in groupHash
* The returned PartitionGroup is the least loaded of all groups that
represent the machine "key"
*
* @param key string representing a partitioned group on preferred machine
key
* @return Option of [[PartitionGroup]] that has least elements for key
*/
- def getLeastGroupHash(key: String): Option[PartitionGroup] = {
- groupHash.get(key).map(_.sortWith(compare).head)
- }
+ def getLeastGroupHash(key: String): Option[PartitionGroup] =
+ groupHash.get(key).filter(_.nonEmpty).map(_.min)
def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
if (!initialHash.contains(part)) {
@@ -236,12 +237,12 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
* is assigned a preferredLocation. This uses coupon collector to estimate
how many
* preferredLocations it must rotate through until it has seen most of the
preferred
* locations (2 * n log(n))
- * @param targetLen
+ * @param targetLen The number of desired partition groups
*/
def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) {
// deal with empty case, just create targetLen partition groups with no
preferred location
if (partitionLocs.partsWithLocs.isEmpty) {
- (1 to targetLen).foreach(x => groupArr += new PartitionGroup())
+ (1 to targetLen).foreach(_ => groupArr += new PartitionGroup())
return
}
@@ -297,9 +298,8 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
// least loaded pref locs
- val pref = currPrefLocs(p,
prev).map(getLeastGroupHash(_)).sortWith(compare)
- val prefPart = if (pref == Nil) None else pref.head
-
+ val pref = currPrefLocs(p, prev).flatMap(getLeastGroupHash)
+ val prefPart = if (pref.isEmpty) None else Some(pref.min)
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = {
@@ -351,7 +351,7 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
val partIter = partitionLocs.partsWithLocs.iterator
groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
while (partIter.hasNext && pg.numPartitions == 0) {
- var (nxt_replica, nxt_part) = partIter.next()
+ var (_, nxt_part) = partIter.next()
if (!initialHash.contains(nxt_part)) {
pg.partitions += nxt_part
initialHash += nxt_part
diff --git
a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala
b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala
new file mode 100644
index 0000000..42b3070
--- /dev/null
+++ b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.rdd
+
+import scala.collection.immutable
+
+import org.apache.spark.SparkContext
+import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
+
+/**
+ * Benchmark for CoalescedRDD.
+ * Measures rdd.coalesce performance under various combinations of
+ * coalesced partitions and preferred hosts
+ * To run this benchmark:
+ * {{{
+ * 1. without sbt:
+ * bin/spark-submit --class <this class> --jars <spark core test jar>
+ * 2. build/sbt "core/test:runMain <this class>"
+ * 3. generate result:
+ * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this
class>"
+ * Results will be written to "benchmarks/CoalescedRDD-results.txt".
+ * }}}
+ * */
+object CoalescedRDDBenchmark extends BenchmarkBase {
+ val seed = 0x1337
+ val sc = new SparkContext(master = "local[4]", appName = "test")
+
+ private def coalescedRDD(numIters: Int): Unit = {
+ val numBlocks = 100000
+ val benchmark = new Benchmark("Coalesced RDD", numBlocks, output = output)
+ for (numPartitions <- Seq(100, 500, 1000, 5000, 10000)) {
+ for (numHosts <- Seq(1, 5, 10, 20, 40, 80)) {
+
+ import collection.mutable
+ val hosts = mutable.ArrayBuffer[String]()
+ (1 to numHosts).foreach(hosts += "m" + _)
+ hosts.length
+ val rnd = scala.util.Random
+ rnd.setSeed(seed)
+ val blocks: immutable.Seq[(Int, Seq[String])] = (1 to numBlocks).map {
i =>
+ (i, hosts(rnd.nextInt(hosts.size)) :: Nil)
+ }
+
+ benchmark.addCase(
+ s"Coalesce Num Partitions: $numPartitions Num Hosts: $numHosts",
+ numIters) { _ =>
+ performCoalesce(blocks, numPartitions)
+ }
+ }
+ }
+
+ benchmark.run()
+ }
+
+ private def performCoalesce(blocks: immutable.Seq[(Int, Seq[String])],
numPartitions: Int) {
+ sc.makeRDD(blocks).coalesce(numPartitions).partitions
+ }
+
+ override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
+ val numIters = 3
+ runBenchmark("Coalesced RDD , large scale") {
+ coalescedRDD(numIters)
+ }
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]