Repository: spark
Updated Branches:
  refs/heads/master 7fc8881b0 -> 32da87dfa


[SPARK-25286][CORE] Removing the dangerous parmap

## What changes were proposed in this pull request?

I propose to remove one of `parmap` methods which accepts an execution context 
as a parameter. The method should be removed to eliminate any deadlocks that 
can occur if `parmap` is called recursively on thread pools restricted by size.

Closes #22292 from MaxGekk/remove-overloaded-parmap.

Authored-by: Maxim Gekk <maxim.g...@databricks.com>
Signed-off-by: Xiao Li <gatorsm...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/32da87df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/32da87df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/32da87df

Branch: refs/heads/master
Commit: 32da87dfa451fff677ed9316f740be2abdbff6a4
Parents: 7fc8881
Author: Maxim Gekk <maxim.g...@databricks.com>
Authored: Fri Aug 31 10:43:30 2018 -0700
Committer: Xiao Li <gatorsm...@gmail.com>
Committed: Fri Aug 31 10:43:30 2018 -0700

----------------------------------------------------------------------
 .../scala/org/apache/spark/rdd/UnionRDD.scala   | 17 ++++++-----
 .../org/apache/spark/util/ThreadUtils.scala     | 32 +++-----------------
 .../streaming/util/FileBasedWriteAheadLog.scala |  5 +--
 3 files changed, 16 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala 
b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
index 4b6f732..60e383a 100644
--- a/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/UnionRDD.scala
@@ -20,13 +20,12 @@ package org.apache.spark.rdd
 import java.io.{IOException, ObjectOutputStream}
 
 import scala.collection.mutable.ArrayBuffer
-import scala.concurrent.ExecutionContext
+import scala.collection.parallel.ForkJoinTaskSupport
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.reflect.ClassTag
 
 import org.apache.spark.{Dependency, Partition, RangeDependency, SparkContext, 
TaskContext}
 import org.apache.spark.annotation.DeveloperApi
-import org.apache.spark.util.ThreadUtils.parmap
 import org.apache.spark.util.Utils
 
 /**
@@ -60,7 +59,8 @@ private[spark] class UnionPartition[T: ClassTag](
 }
 
 object UnionRDD {
-  private[spark] lazy val threadPool = new ForkJoinPool(8)
+  private[spark] lazy val partitionEvalTaskSupport =
+    new ForkJoinTaskSupport(new ForkJoinPool(8))
 }
 
 @DeveloperApi
@@ -74,13 +74,14 @@ class UnionRDD[T: ClassTag](
     rdds.length > conf.getInt("spark.rdd.parallelListingThreshold", 10)
 
   override def getPartitions: Array[Partition] = {
-    val partitionLengths = if (isPartitionListingParallel) {
-      implicit val ec = ExecutionContext.fromExecutor(UnionRDD.threadPool)
-      parmap(rdds)(_.partitions.length)
+    val parRDDs = if (isPartitionListingParallel) {
+      val parArray = rdds.par
+      parArray.tasksupport = UnionRDD.partitionEvalTaskSupport
+      parArray
     } else {
-      rdds.map(_.partitions.length)
+      rdds
     }
-    val array = new Array[Partition](partitionLengths.sum)
+    val array = new Array[Partition](parRDDs.map(_.partitions.length).seq.sum)
     var pos = 0
     for ((rdd, rddIndex) <- rdds.zipWithIndex; split <- rdd.partitions) {
       array(pos) = new UnionPartition(pos, rdd, rddIndex, split.index)

http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala 
b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
index f0e5add..cb0c205 100644
--- a/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/ThreadUtils.scala
@@ -284,36 +284,12 @@ private[spark] object ThreadUtils {
     try {
       implicit val ec = ExecutionContext.fromExecutor(pool)
 
-      parmap(in)(f)
+      val futures = in.map(x => Future(f(x)))
+      val futureSeq = Future.sequence(futures)
+
+      awaitResult(futureSeq, Duration.Inf)
     } finally {
       pool.shutdownNow()
     }
   }
-
-  /**
-   * Transforms input collection by applying the given function to each 
element in parallel fashion.
-   * Comparing to the map() method of Scala parallel collections, this method 
can be interrupted
-   * at any time. This is useful on canceling of task execution, for example.
-   *
-   * @param in - the input collection which should be transformed in parallel.
-   * @param f - the lambda function will be applied to each element of `in`.
-   * @param ec - an execution context for parallel applying of the given 
function `f`.
-   * @tparam I - the type of elements in the input collection.
-   * @tparam O - the type of elements in resulted collection.
-   * @return new collection in which each element was given from the input 
collection `in` by
-   *         applying the lambda function `f`.
-   */
-  def parmap[I, O, Col[X] <: TraversableLike[X, Col[X]]]
-      (in: Col[I])
-      (f: I => O)
-      (implicit
-        cbf: CanBuildFrom[Col[I], Future[O], Col[Future[O]]], // For in.map
-        cbf2: CanBuildFrom[Col[Future[O]], O, Col[O]], // for Future.sequence
-        ec: ExecutionContext
-      ): Col[O] = {
-    val futures = in.map(x => Future(f(x)))
-    val futureSeq = Future.sequence(futures)
-
-    awaitResult(futureSeq, Duration.Inf)
-  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/32da87df/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
----------------------------------------------------------------------
diff --git 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
index bba071e..f0161e1 100644
--- 
a/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
+++ 
b/streaming/src/main/scala/org/apache/spark/streaming/util/FileBasedWriteAheadLog.scala
@@ -312,10 +312,11 @@ private[streaming] object FileBasedWriteAheadLog {
       handler: I => Iterator[O]): Iterator[O] = {
     val taskSupport = new ExecutionContextTaskSupport(executionContext)
     val groupSize = taskSupport.parallelismLevel.max(8)
-    implicit val ec = executionContext
 
     source.grouped(groupSize).flatMap { group =>
-      ThreadUtils.parmap(group)(handler)
+      val parallelCollection = group.par
+      parallelCollection.tasksupport = taskSupport
+      parallelCollection.map(handler)
     }.flatten
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@spark.apache.org
For additional commands, e-mail: commits-h...@spark.apache.org

Reply via email to