This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 4bab69b Revert "[SPARK-27070] Fix performance bug in
DefaultPartitionCoalescer"
4bab69b is described below
commit 4bab69b22a50ae00b92ed6ab3b5120574dc3aa19
Author: Dongjoon Hyun <[email protected]>
AuthorDate: Fri Mar 15 14:56:08 2019 -0700
Revert "[SPARK-27070] Fix performance bug in DefaultPartitionCoalescer"
This reverts commit 21db4336b08fcb93779d72ebbb0251f3a2d08934.
---
core/benchmarks/CoalescedRDDBenchmark-results.txt | 40 -----------
.../scala/org/apache/spark/rdd/CoalescedRDD.scala | 34 ++++-----
.../apache/spark/rdd/CoalescedRDDBenchmark.scala | 80 ----------------------
3 files changed, 17 insertions(+), 137 deletions(-)
diff --git a/core/benchmarks/CoalescedRDDBenchmark-results.txt
b/core/benchmarks/CoalescedRDDBenchmark-results.txt
deleted file mode 100644
index dd63b0a..0000000
--- a/core/benchmarks/CoalescedRDDBenchmark-results.txt
+++ /dev/null
@@ -1,40 +0,0 @@
-================================================================================================
-Coalesced RDD , large scale
-================================================================================================
-
-Java HotSpot(TM) 64-Bit Server VM 1.8.0_201-b09 on Windows 10 10.0
-Intel64 Family 6 Model 63 Stepping 2, GenuineIntel
-Coalesced RDD: Best Time(ms) Avg Time(ms)
Stdev(ms) Rate(M/s) Per Row(ns) Relative
-------------------------------------------------------------------------------------------------------------------------
-Coalesce Num Partitions: 100 Num Hosts: 1 346 364
24 0.3 3458.9 1.0X
-Coalesce Num Partitions: 100 Num Hosts: 5 258 264
6 0.4 2579.0 1.3X
-Coalesce Num Partitions: 100 Num Hosts: 10 242 249
7 0.4 2415.2 1.4X
-Coalesce Num Partitions: 100 Num Hosts: 20 237 242
7 0.4 2371.7 1.5X
-Coalesce Num Partitions: 100 Num Hosts: 40 230 231
1 0.4 2299.8 1.5X
-Coalesce Num Partitions: 100 Num Hosts: 80 222 233
14 0.4 2223.0 1.6X
-Coalesce Num Partitions: 500 Num Hosts: 1 659 665
5 0.2 6590.4 0.5X
-Coalesce Num Partitions: 500 Num Hosts: 5 340 381
47 0.3 3395.2 1.0X
-Coalesce Num Partitions: 500 Num Hosts: 10 279 307
47 0.4 2788.3 1.2X
-Coalesce Num Partitions: 500 Num Hosts: 20 259 261
2 0.4 2591.9 1.3X
-Coalesce Num Partitions: 500 Num Hosts: 40 241 250
15 0.4 2406.5 1.4X
-Coalesce Num Partitions: 500 Num Hosts: 80 235 237
3 0.4 2349.9 1.5X
-Coalesce Num Partitions: 1000 Num Hosts: 1 1050 1053
4 0.1 10503.2 0.3X
-Coalesce Num Partitions: 1000 Num Hosts: 5 405 407
2 0.2 4049.5 0.9X
-Coalesce Num Partitions: 1000 Num Hosts: 10 320 322
2 0.3 3202.7 1.1X
-Coalesce Num Partitions: 1000 Num Hosts: 20 276 277
0 0.4 2762.3 1.3X
-Coalesce Num Partitions: 1000 Num Hosts: 40 257 260
5 0.4 2571.2 1.3X
-Coalesce Num Partitions: 1000 Num Hosts: 80 245 252
13 0.4 2448.9 1.4X
-Coalesce Num Partitions: 5000 Num Hosts: 1 3099 3145
55 0.0 30988.6 0.1X
-Coalesce Num Partitions: 5000 Num Hosts: 5 1037 1050
20 0.1 10374.4 0.3X
-Coalesce Num Partitions: 5000 Num Hosts: 10 626 633
8 0.2 6261.8 0.6X
-Coalesce Num Partitions: 5000 Num Hosts: 20 426 431
5 0.2 4258.6 0.8X
-Coalesce Num Partitions: 5000 Num Hosts: 40 328 341
22 0.3 3275.4 1.1X
-Coalesce Num Partitions: 5000 Num Hosts: 80 272 275
4 0.4 2721.4 1.3X
-Coalesce Num Partitions: 10000 Num Hosts: 1 5516 5526
9 0.0 55156.8 0.1X
-Coalesce Num Partitions: 10000 Num Hosts: 5 1956 1992
48 0.1 19560.9 0.2X
-Coalesce Num Partitions: 10000 Num Hosts: 10 1045 1057
18 0.1 10447.4 0.3X
-Coalesce Num Partitions: 10000 Num Hosts: 20 637 658
24 0.2 6373.2 0.5X
-Coalesce Num Partitions: 10000 Num Hosts: 40 431 448
15 0.2 4312.9 0.8X
-Coalesce Num Partitions: 10000 Num Hosts: 80 326 328
2 0.3 3263.4 1.1X
-
-
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index e006f63..94e7d0b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -58,7 +58,7 @@ private[spark] case class CoalescedRDDPartition(
val parentPreferredLocations = rdd.context.getPreferredLocs(rdd,
p.index).map(_.host)
preferredLocation.exists(parentPreferredLocations.contains)
}
- if (parents.isEmpty) 0.0 else loc.toDouble / parents.size.toDouble
+ if (parents.size == 0) 0.0 else (loc.toDouble / parents.size.toDouble)
}
}
@@ -91,7 +91,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
pc.coalesce(maxPartitions, prev).zipWithIndex.map {
case (pg, i) =>
val ids = pg.partitions.map(_.index).toArray
- CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
+ new CoalescedRDDPartition(i, prev, ids, pg.prefLoc)
}
}
@@ -116,7 +116,7 @@ private[spark] class CoalescedRDD[T: ClassTag](
/**
* Returns the preferred machine for the partition. If split is of type
CoalescedRDDPartition,
* then the preferred machine will be one which most parent splits prefer
too.
- * @param partition the partition for which to retrieve the preferred
machine, if exists
+ * @param partition
* @return the machine most preferred by split
*/
override def getPreferredLocations(partition: Partition): Seq[String] = {
@@ -156,11 +156,9 @@ private[spark] class CoalescedRDD[T: ClassTag](
private class DefaultPartitionCoalescer(val balanceSlack: Double = 0.10)
extends PartitionCoalescer {
-
- implicit val partitionGroupOrdering: Ordering[PartitionGroup] =
- (o1: PartitionGroup, o2: PartitionGroup) =>
- java.lang.Integer.compare(o1.numPartitions, o2.numPartitions)
-
+ def compare(o1: PartitionGroup, o2: PartitionGroup): Boolean =
o1.numPartitions < o2.numPartitions
+ def compare(o1: Option[PartitionGroup], o2: Option[PartitionGroup]): Boolean
=
+ if (o1 == None) false else if (o2 == None) true else compare(o1.get,
o2.get)
val rnd = new scala.util.Random(7919) // keep this class deterministic
@@ -180,7 +178,7 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
prev.context.getPreferredLocs(prev, part.index).map(tl => tl.host)
}
- private class PartitionLocations(prev: RDD[_]) {
+ class PartitionLocations(prev: RDD[_]) {
// contains all the partitions from the previous RDD that don't have
preferred locations
val partsWithoutLocs = ArrayBuffer[Partition]()
@@ -215,14 +213,15 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
}
/**
- * Gets the least element of the list associated with key in groupHash
+ * Sorts and gets the least element of the list associated with key in
groupHash
* The returned PartitionGroup is the least loaded of all groups that
represent the machine "key"
*
* @param key string representing a partitioned group on preferred machine
key
* @return Option of [[PartitionGroup]] that has least elements for key
*/
- def getLeastGroupHash(key: String): Option[PartitionGroup] =
- groupHash.get(key).filter(_.nonEmpty).map(_.min)
+ def getLeastGroupHash(key: String): Option[PartitionGroup] = {
+ groupHash.get(key).map(_.sortWith(compare).head)
+ }
def addPartToPGroup(part: Partition, pgroup: PartitionGroup): Boolean = {
if (!initialHash.contains(part)) {
@@ -237,12 +236,12 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
* is assigned a preferredLocation. This uses coupon collector to estimate
how many
* preferredLocations it must rotate through until it has seen most of the
preferred
* locations (2 * n log(n))
- * @param targetLen The number of desired partition groups
+ * @param targetLen
*/
def setupGroups(targetLen: Int, partitionLocs: PartitionLocations) {
// deal with empty case, just create targetLen partition groups with no
preferred location
if (partitionLocs.partsWithLocs.isEmpty) {
- (1 to targetLen).foreach(_ => groupArr += new PartitionGroup())
+ (1 to targetLen).foreach(x => groupArr += new PartitionGroup())
return
}
@@ -298,8 +297,9 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
partitionLocs: PartitionLocations): PartitionGroup = {
val slack = (balanceSlack * prev.partitions.length).toInt
// least loaded pref locs
- val pref = currPrefLocs(p, prev).flatMap(getLeastGroupHash)
- val prefPart = if (pref.isEmpty) None else Some(pref.min)
+ val pref = currPrefLocs(p,
prev).map(getLeastGroupHash(_)).sortWith(compare)
+ val prefPart = if (pref == Nil) None else pref.head
+
val r1 = rnd.nextInt(groupArr.size)
val r2 = rnd.nextInt(groupArr.size)
val minPowerOfTwo = {
@@ -351,7 +351,7 @@ private class DefaultPartitionCoalescer(val balanceSlack:
Double = 0.10)
val partIter = partitionLocs.partsWithLocs.iterator
groupArr.filter(pg => pg.numPartitions == 0).foreach { pg =>
while (partIter.hasNext && pg.numPartitions == 0) {
- var (_, nxt_part) = partIter.next()
+ var (nxt_replica, nxt_part) = partIter.next()
if (!initialHash.contains(nxt_part)) {
pg.partitions += nxt_part
initialHash += nxt_part
diff --git
a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala
b/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala
deleted file mode 100644
index 42b3070..0000000
--- a/core/src/test/scala/org/apache/spark/rdd/CoalescedRDDBenchmark.scala
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.rdd
-
-import scala.collection.immutable
-
-import org.apache.spark.SparkContext
-import org.apache.spark.benchmark.{Benchmark, BenchmarkBase}
-
-/**
- * Benchmark for CoalescedRDD.
- * Measures rdd.coalesce performance under various combinations of
- * coalesced partitions and preferred hosts
- * To run this benchmark:
- * {{{
- * 1. without sbt:
- * bin/spark-submit --class <this class> --jars <spark core test jar>
- * 2. build/sbt "core/test:runMain <this class>"
- * 3. generate result:
- * SPARK_GENERATE_BENCHMARK_FILES=1 build/sbt "core/test:runMain <this
class>"
- * Results will be written to "benchmarks/CoalescedRDD-results.txt".
- * }}}
- * */
-object CoalescedRDDBenchmark extends BenchmarkBase {
- val seed = 0x1337
- val sc = new SparkContext(master = "local[4]", appName = "test")
-
- private def coalescedRDD(numIters: Int): Unit = {
- val numBlocks = 100000
- val benchmark = new Benchmark("Coalesced RDD", numBlocks, output = output)
- for (numPartitions <- Seq(100, 500, 1000, 5000, 10000)) {
- for (numHosts <- Seq(1, 5, 10, 20, 40, 80)) {
-
- import collection.mutable
- val hosts = mutable.ArrayBuffer[String]()
- (1 to numHosts).foreach(hosts += "m" + _)
- hosts.length
- val rnd = scala.util.Random
- rnd.setSeed(seed)
- val blocks: immutable.Seq[(Int, Seq[String])] = (1 to numBlocks).map {
i =>
- (i, hosts(rnd.nextInt(hosts.size)) :: Nil)
- }
-
- benchmark.addCase(
- s"Coalesce Num Partitions: $numPartitions Num Hosts: $numHosts",
- numIters) { _ =>
- performCoalesce(blocks, numPartitions)
- }
- }
- }
-
- benchmark.run()
- }
-
- private def performCoalesce(blocks: immutable.Seq[(Int, Seq[String])],
numPartitions: Int) {
- sc.makeRDD(blocks).coalesce(numPartitions).partitions
- }
-
- override def runBenchmarkSuite(mainArgs: Array[String]): Unit = {
- val numIters = 3
- runBenchmark("Coalesced RDD , large scale") {
- coalescedRDD(numIters)
- }
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]