Repository: spark Updated Branches: refs/heads/branch-1.3 dafb3d210 -> 5d309ad6c
[SPARK-5363] Fix bug in PythonRDD: remove() inside iterator is not safe Removing elements from a mutable HashSet while iterating over it can cause the iteration to incorrectly skip over entries that were not removed. If this happened, PythonRDD would write fewer broadcast variables than the Python worker was expecting to read, which would cause the Python worker to hang indefinitely. Author: Davies Liu <[email protected]> Closes #4776 from davies/fix_hang and squashes the following commits: a4384a5 [Davies Liu] fix bug: remvoe() inside iterator is not safe (cherry picked from commit 7fa960e653a905fc48d4097b49ce560cff919fa2) Signed-off-by: Josh Rosen <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/5d309ad6 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/5d309ad6 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/5d309ad6 Branch: refs/heads/branch-1.3 Commit: 5d309ad6c085b6e193771d1ecb9c52f8a55b21ef Parents: dafb3d2 Author: Davies Liu <[email protected]> Authored: Thu Feb 26 11:54:17 2015 -0800 Committer: Josh Rosen <[email protected]> Committed: Thu Feb 26 11:56:36 2015 -0800 ---------------------------------------------------------------------- .../scala/org/apache/spark/api/python/PythonRDD.scala | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/5d309ad6/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index dcb6e63..b1cec0f 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -219,14 +219,13 @@ private[spark] class PythonRDD( val oldBids = PythonRDD.getWorkerBroadcasts(worker) val newBids = broadcastVars.map(_.id).toSet // number of different broadcasts - val cnt = oldBids.diff(newBids).size + newBids.diff(oldBids).size + val toRemove = oldBids.diff(newBids) + val cnt = toRemove.size + newBids.diff(oldBids).size dataOut.writeInt(cnt) - for (bid <- oldBids) { - if (!newBids.contains(bid)) { - // remove the broadcast from worker - dataOut.writeLong(- bid - 1) // bid >= 0 - oldBids.remove(bid) - } + for (bid <- toRemove) { + // remove the broadcast from worker + dataOut.writeLong(- bid - 1) // bid >= 0 + oldBids.remove(bid) } for (broadcast <- broadcastVars) { if (!oldBids.contains(broadcast.id)) { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
