Repository: spark
Updated Branches:
  refs/heads/master 2ce37b50f -> 42cf48e20


[SPARK-23496][CORE] Locality of coalesced partitions can be severely skewed by 
the order of input partitions

## What changes were proposed in this pull request?

The algorithm in `DefaultPartitionCoalescer.setupGroups` is responsible for 
picking preferred locations for coalesced partitions. It analyzes the preferred 
locations of input partitions. It starts by trying to create one partition for 
each unique location in the input. However, if the the requested number of 
coalesced partitions is higher that the number of unique locations, it has to 
pick duplicate locations.

Previously, the duplicate locations would be picked by iterating over the input 
partitions in order, and copying their preferred locations to coalesced 
partitions. If the input partitions were clustered by location, this could 
result in severe skew.

With the fix, instead of iterating over the list of input partitions in order, 
we pick them at random. It's not perfectly balanced, but it's much better.

## How was this patch tested?

Unit test reproducing the behavior was added.

Author: Ala Luszczak <a...@databricks.com>

Closes #20664 from ala/SPARK-23496.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/42cf48e2
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/42cf48e2
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/42cf48e2

Branch: refs/heads/master
Commit: 42cf48e20cd5e47e1b7557af9c71c4eea142f10f
Parents: 2ce37b5
Author: Ala Luszczak <a...@databricks.com>
Authored: Mon Mar 5 14:33:12 2018 +0100
Committer: Herman van Hovell <hvanhov...@databricks.com>
Committed: Mon Mar 5 14:33:12 2018 +0100

----------------------------------------------------------------------
 .../org/apache/spark/rdd/CoalescedRDD.scala     |  8 ++--
 .../scala/org/apache/spark/rdd/RDDSuite.scala   | 42 ++++++++++++++++++++
 2 files changed, 46 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/42cf48e2/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
index 10451a3..94e7d0b 100644
--- a/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/CoalescedRDD.scala
@@ -266,17 +266,17 @@ private class DefaultPartitionCoalescer(val balanceSlack: 
Double = 0.10)
         numCreated += 1
       }
     }
-    tries = 0
     // if we don't have enough partition groups, create duplicates
     while (numCreated < targetLen) {
-      val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(tries)
-      tries += 1
+      // Copy the preferred location from a random input partition.
+      // This helps in avoiding skew when the input partitions are clustered 
by preferred location.
+      val (nxt_replica, nxt_part) = partitionLocs.partsWithLocs(
+        rnd.nextInt(partitionLocs.partsWithLocs.length))
       val pgroup = new PartitionGroup(Some(nxt_replica))
       groupArr += pgroup
       groupHash.getOrElseUpdate(nxt_replica, ArrayBuffer()) += pgroup
       addPartToPGroup(nxt_part, pgroup)
       numCreated += 1
-      if (tries >= partitionLocs.partsWithLocs.length) tries = 0
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/42cf48e2/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index e994d72..191c612 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -1129,6 +1129,35 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
     }.collect()
   }
 
+  test("SPARK-23496: order of input partitions can result in severe skew in 
coalesce") {
+    val numInputPartitions = 100
+    val numCoalescedPartitions = 50
+    val locations = Array("locA", "locB")
+
+    val inputRDD = sc.makeRDD(Range(0, numInputPartitions).toArray[Int], 
numInputPartitions)
+    assert(inputRDD.getNumPartitions == numInputPartitions)
+
+    val locationPrefRDD = new LocationPrefRDD(inputRDD, { (p: Partition) =>
+      if (p.index < numCoalescedPartitions) {
+        Seq(locations(0))
+      } else {
+        Seq(locations(1))
+      }
+    })
+    val coalescedRDD = new CoalescedRDD(locationPrefRDD, 
numCoalescedPartitions)
+
+    val numPartsPerLocation = coalescedRDD
+      .getPartitions
+      .map(coalescedRDD.getPreferredLocations(_).head)
+      .groupBy(identity)
+      .mapValues(_.size)
+
+    // Make sure the coalesced partitions are distributed fairly evenly 
between the two locations.
+    // This should not become flaky since the DefaultPartitionsCoalescer uses 
a fixed seed.
+    assert(numPartsPerLocation(locations(0)) > 0.4 * numCoalescedPartitions)
+    assert(numPartsPerLocation(locations(1)) > 0.4 * numCoalescedPartitions)
+  }
+
   // NOTE
   // Below tests calling sc.stop() have to be the last tests in this suite. If 
there are tests
   // running after them and if they access sc those tests will fail as sc is 
already closed, because
@@ -1210,3 +1239,16 @@ class SizeBasedCoalescer(val maxSize: Int) extends 
PartitionCoalescer with Seria
     groups.toArray
   }
 }
+
+/** Alters the preferred locations of the parent RDD using provided function. 
*/
+class LocationPrefRDD[T: ClassTag](
+    @transient var prev: RDD[T],
+    val locationPicker: Partition => Seq[String]) extends RDD[T](prev) {
+  override protected def getPartitions: Array[Partition] = prev.partitions
+
+  override def compute(partition: Partition, context: TaskContext): 
Iterator[T] =
+    null.asInstanceOf[Iterator[T]]
+
+  override def getPreferredLocations(partition: Partition): Seq[String] =
+    locationPicker(partition)
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to