Repository: spark Updated Branches: refs/heads/branch-1.0 a314342da -> d2cbd3d76
SPARK-1700: Close socket file descriptors on task completion This will ensure that sockets do not build up over the course of a job, and that cancellation successfully cleans up sockets. Tested in standalone mode. More file descriptors spawn than expected (around 1000ish rather than the expected 8ish) but they do not pile up between runs, or as high as before (where they went up to around 5k). Author: Aaron Davidson <[email protected]> Closes #623 from aarondav/pyspark2 and squashes the following commits: 0ca13bb [Aaron Davidson] SPARK-1700: Close socket file descriptors on task completion (cherry picked from commit 0a14421765b672305e8f32ded4a9a1f6f7241d8d) Signed-off-by: Aaron Davidson <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/spark/repo Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/d2cbd3d7 Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/d2cbd3d7 Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/d2cbd3d7 Branch: refs/heads/branch-1.0 Commit: d2cbd3d766ac96de75f9b519696a83a9b810e21c Parents: a314342 Author: Aaron Davidson <[email protected]> Authored: Fri May 2 23:55:13 2014 -0700 Committer: Aaron Davidson <[email protected]> Committed: Sat May 3 00:12:09 2014 -0700 ---------------------------------------------------------------------- .../scala/org/apache/spark/api/python/PythonRDD.scala | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/spark/blob/d2cbd3d7/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala index 672c344..6140700 100644 --- a/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala +++ b/core/src/main/scala/org/apache/spark/api/python/PythonRDD.scala @@ -54,7 +54,16 @@ private[spark] class PythonRDD[T: ClassTag]( override def compute(split: Partition, context: TaskContext): Iterator[Array[Byte]] = { val startTime = System.currentTimeMillis val env = SparkEnv.get - val worker = env.createPythonWorker(pythonExec, envVars.toMap) + val worker: Socket = env.createPythonWorker(pythonExec, envVars.toMap) + + // Ensure worker socket is closed on task completion. Closing sockets is idempotent. + context.addOnCompleteCallback(() => + try { + worker.close() + } catch { + case e: Exception => logWarning("Failed to close worker socket", e) + } + ) @volatile var readerException: Exception = null
