Repository: spark
Updated Branches:
  refs/heads/master 5c47db065 -> 08db49126


[SPARK-9926] Parallelize partition logic in UnionRDD.

This patch has the new logic from #8512 that uses a parallel collection to 
compute partitions in UnionRDD. The rest of #8512 added an alternative code 
path for calculating splits in S3, but that isn't necessary to get the same 
speedup. The underlying problem wasn't that bulk listing wasn't used, it was 
that an extra FileStatus was retrieved for each file. The fix was just 
committed as 
[HADOOP-12810](https://issues.apache.org/jira/browse/HADOOP-12810). (I think 
the original commit also used a single prefix to enumerate all paths, but that 
isn't always helpful and it was removed in later versions so there is no need 
for SparkS3Utils.)

I tested this using the same table that piapiaozhexiu was using. Calculating 
splits for a 10-day period took 25 seconds with this change and HADOOP-12810, 
which is on par with the results from #8512.

Author: Ryan Blue <b...@apache.org>
Author: Cheolsoo Park <cheols...@netflix.com>

Closes #11242 from rdblue/SPARK-9926-parallelize-union-rdd.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/08db4912
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/08db4912
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/08db4912

Branch: refs/heads/master
Commit: 08db491265a3b50e31993ac6aa07c3f0dd08cdbb
Parents: 5c47db0
Author: Ryan Blue <b...@apache.org>
Authored: Thu May 5 14:40:37 2016 -0700
Committer: Andrew Or <and...@databricks.com>
Committed: Thu May 5 14:40:37 2016 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/UnionRDD.scala     | 18 +++++++++++++++++-
 .../scala/org/apache/spark/rdd/RDDSuite.scala     | 17 +++++++++++++++++
 2 files changed, 34 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/08db4912/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 66cf436..8171dcc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,6 +20,8 @@ package org.apache.spark.rdd
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
+import scala.collection.parallel.ForkJoinTaskSupport
+import scala.concurrent.forkjoin.ForkJoinPool
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, 
TaskContext}
@@ -62,8 +64,22 @@ class UnionRDD[T: ClassTag](
     var rdds: Seq[RDD[T]])
   extends RDD[T](sc, Nil) {  // Nil since we implement getDependencies
 
+  // visible for testing
+  private[spark] val isPartitionListingParallel: Boolean =
+    rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
+
+  @transient private lazy val partitionEvalTaskSupport =
+      new ForkJoinTaskSupport(new ForkJoinPool(8))
+
   override def getPartitions: Array[Partition] = {
-    val array = new Array[Partition](rdds.map(_.partitions.length).sum)
+    val parRDDs = if (isPartitionListingParallel) {
+      val parArray = rdds.par
+      parArray.tasksupport = partitionEvalTaskSupport
+      parArray
+    } else {
+      rdds
+    }
+    val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum)
     var pos = 0
     for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
       array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)

http://git-wip-us.apache.org/repos/asf/spark/blob/08db4912/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala 
b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index a663dab..979fb42 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -116,6 +116,23 @@ class RDDSuite extends SparkFunSuite with 
SharedSparkContext {
     assert(sc.union(Seq(nums, nums)).collect().toList === List(1, 2, 3, 4, 1, 
2, 3, 4))
   }
 
+  test("SparkContext.union parallel partition listing") {
+    val nums1 = sc.makeRDD(Array(1, 2, 3, 4), 2)
+    val nums2 = sc.makeRDD(Array(5, 6, 7, 8), 2)
+    val serialUnion = sc.union(nums1, nums2)
+    val expected = serialUnion.collect().toList
+
+    assert(serialUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel 
=== false)
+
+    sc.conf.set("spark.rdd.parallelListingThreshold", "1")
+    val parallelUnion = sc.union(nums1, nums2)
+    val actual = parallelUnion.collect().toList
+    sc.conf.remove("spark.rdd.parallelListingThreshold")
+
+    
assert(parallelUnion.asInstanceOf[UnionRDD[Int]].isPartitionListingParallel === 
true)
+    assert(expected === actual)
+  }
+
   test("SparkContext.union creates UnionRDD if at least one RDD has no 
partitioner") {
     val rddWithPartitioner = sc.parallelize(Seq(1 -> true)).partitionBy(new 
HashPartitioner(1))
     val rddWithNoPartitioner = sc.parallelize(Seq(2 -> true))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to