[ 
https://issues.apache.org/jira/browse/SPARK-11711?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sean Owen resolved SPARK-11711.
-------------------------------
    Resolution: Duplicate

Merging this into the other as it has slightly more discussion of the same issue

> Finalizer memory leak is pyspark
> --------------------------------
>
>                 Key: SPARK-11711
>                 URL: https://issues.apache.org/jira/browse/SPARK-11711
>             Project: Spark
>          Issue Type: Bug
>          Components: PySpark
>    Affects Versions: 1.5.1
>            Reporter: David Watson
>
> I've been having super consistent memory leaks in the java process of python 
> spark streaming scripts on my driver.  A heap profile analysis showed 
> MILLIONS of Finalizer objects.  
> The spark web interface under Executor Thread Dump shows:
> Thread 3: Finalizer (WAITING):
> java.net.SocketInputStream.socketRead0(Native Method)
> java.net.SocketInputStream.read(SocketInputStream.java:152)
> java.net.SocketInputStream.read(SocketInputStream.java:122)
> sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
> sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
> sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
> java.io.InputStreamReader.read(InputStreamReader.java:184)
> java.io.BufferedReader.fill(BufferedReader.java:154)
> java.io.BufferedReader.readLine(BufferedReader.java:317)
> java.io.BufferedReader.readLine(BufferedReader.java:382)
> py4j.CallbackConnection.sendCommand(CallbackConnection.java:82)
> py4j.CallbackClient.sendCommand(CallbackClient.java:236)
> py4j.reflection.PythonProxyHandler.finalize(PythonProxyHandler.java:81)
> java.lang.System$2.invokeFinalize(System.java:1213)
> java.lang.ref.Finalizer.runFinalizer(Finalizer.java:98)
> java.lang.ref.Finalizer.access$100(Finalizer.java:34)
> java.lang.ref.Finalizer$FinalizerThread.run(Finalizer.java:210)
> It appears the problem is with py4j.  I don't have a patch because the bug is 
> inside the python/lib/py4j-0.8.2.1-src.zip zip file.  I've monkey patched and 
> it appears to fix the problem.
> in py4j.java_gateway.CallbackConnection:1186 run():
> """
>                 elif command == GARBAGE_COLLECT_PROXY_COMMAND_NAME:
>                     self.input.readline()
>                     del(self.pool[obj_id])
> """
> NOTE: it doesn't write a response to the socket!
> and on the java side, CallbackConnection.java:82 sendCommand():
> """
>                       returnCommand = reader.readLine();
> """
> I don't know what the protocol is supposed to be, but the java side wants a 
> response but the python side isn't sending it.  As you can see from the stack 
> trace, this jams up the java FinalizerThread which keeps anything from 
> getting finalized, spark related or not.
> My monkey patch to py4j.java_gateway.CallbackConnection:1186 run():
> """
>                 elif command == GARBAGE_COLLECT_PROXY_COMMAND_NAME:
>                     self.input.readline()
>                     del(self.pool[obj_id])
> +                    ## PATCH: send an empty response!
> +                    self.socket.sendall("\n")
> +                    ##
> """
> This bug appears to exist in the current py4j, but I can't find the 
> repository for the 0.8.2.1 version embedded in spark.
> I'm not entirely sure, but I suspect that (at least on the driver) this 
> doesn't normally get triggered because the java object references held by 
> python are long lived so it wouldn't get triggered (thus jamming up the 
> FinalizerThread) until the program ends. 
> My code is peeking at checkpoint file (code below) before starting the 
> script, which looks like it's jamming things up at the beginning, and any 
> other finalized objects (scala? java?) are piling up behind it.
> """
> def loadCheckpoint(checkpointPath):
>     StreamingContext._ensure_initialized()
>     gw = SparkContext._gateway
>     # Check whether valid checkpoint information exists in the given path
>     cp = gw.jvm.CheckpointReader.read(checkpointPath)
>     assert cp is not None, "Couldn't load %s" % checkpointPath
>     return cp.get()
> """
> At any rate, I can confirm that the same situation exists on the worker nodes 
> as well as the driver and this fixes both.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to