Repository: spark
Updated Branches:
  refs/heads/branch-1.0 cf1d46e46 -> 313f202e2


SPARK-2282: Reuse PySpark Accumulator sockets to avoid crashing Spark

JIRA: https://issues.apache.org/jira/browse/SPARK-2282

This issue is caused by a buildup of sockets in the TIME_WAIT stage of TCP, 
which is a stage that lasts for some period of time after the communication 
closes.

This solution simply allows us to reuse sockets that are in TIME_WAIT, to avoid 
issues with the buildup of the rapid creation of these sockets.

Author: Aaron Davidson <[email protected]>

Closes #1220 from aarondav/SPARK-2282 and squashes the following commits:

2e5cab3 [Aaron Davidson] SPARK-2282: Reuse PySpark Accumulator sockets to avoid 
crashing Spark
(cherry picked from commit 97a0bfe1c0261384f09d53f9350de52fb6446d59)

Signed-off-by: Patrick Wendell <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/313f202e
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/313f202e
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/313f202e

Branch: refs/heads/branch-1.0
Commit: 313f202e27878bb9a1ec425defd248203bc73c5f
Parents: cf1d46e
Author: Aaron Davidson <[email protected]>
Authored: Thu Jul 3 23:02:36 2014 -0700
Committer: Patrick Wendell <[email protected]>
Committed: Thu Jul 3 23:02:47 2014 -0700

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/313f202e/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala 
b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
index 57b28b9..0217a58 100644
--- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
+++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala
@@ -402,6 +402,8 @@ private class PythonAccumulatorParam(@transient serverHost: 
String, serverPort:
     } else {
       // This happens on the master, where we pass the updates to Python 
through a socket
       val socket = new Socket(serverHost, serverPort)
+      // SPARK-2282: Immediately reuse closed sockets because we create one 
per task.
+      socket.setReuseAddress(true)
       val in = socket.getInputStream
       val out = new DataOutputStream(new 
BufferedOutputStream(socket.getOutputStream, bufferSize))
       out.writeInt(val2.size)

Reply via email to