Repository: spark
Updated Branches:
  refs/heads/branch-1.2 015895ab5 -> cc7313d09


[SPARK-5363] Fix bug in PythonRDD: remove() inside iterator is not safe

Removing elements from a mutable HashSet while iterating over it can cause the
iteration to incorrectly skip over entries that were not removed. If this
happened, PythonRDD would write fewer broadcast variables than the Python
worker was expecting to read, which would cause the Python worker to hang
indefinitely.

Author: Davies Liu <[email protected]>

Closes #4776 from davies/fix_hang and squashes the following commits:

a4384a5 [Davies Liu] fix bug: remvoe() inside iterator is not safe

(cherry picked from commit 7fa960e653a905fc48d4097b49ce560cff919fa2)
Signed-off-by: Josh Rosen <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/cc7313d0
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/cc7313d0
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/cc7313d0

Branch: refs/heads/branch-1.2
Commit: cc7313d09777f7749c2588e5bc50096a9762c0ce
Parents: 015895a
Author: Davies Liu <[email protected]>
Authored: Thu Feb 26 11:54:17 2015 -0800
Committer: Josh Rosen <[email protected]>
Committed: Thu Feb 26 11:57:07 2015 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/api/python/PythonRDD.scala  | 13 ++++++-------
 1 file changed, 6 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/cc7313d0/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 0d508d6..bfd36c7 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -220,14 +220,13 @@ private[spark] class PythonRDD(
         val oldBids = PythonRDD.getWorkerBroadcasts(worker)
         val newBids = broadcastVars.map(_.id).toSet
         // number of different broadcasts
-        val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size
+        val toRemove = oldBids.diff(newBids)
+        val cnt = toRemove.size + newBids.diff(oldBids).size
         dataOut.writeInt(cnt)
-        for (bid <- oldBids) {
-          if (!newBids.contains(bid)) {
-            // remove the broadcast from worker
-            dataOut.writeLong(- bid - 1)  // bid >= 0
-            oldBids.remove(bid)
-          }
+        for (bid <- toRemove) {
+          // remove the broadcast from worker
+          dataOut.writeLong(- bid - 1)  // bid >= 0
+          oldBids.remove(bid)
         }
         for (broadcast <- broadcastVars) {
           if (!oldBids.contains(broadcast.id)) {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to