Thanks for looking into it, I'd like the idea of having ForkingIterator. If we have unlimited buffer in it, then will not have the problem of deadlock, I think. The writing thread will be blocked by Python process, so there will be not much rows be buffered(still be a reason to OOM). At least, this approach is better than current one.
Could you create a JIRA and sending out the PR? On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.u...@gmail.com> wrote: > BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but > I have a proof-of-concept implementation that avoids caching the entire > dataset. > > Hi, > > We have been running into performance problems using Python UDFs with > DataFrames at large scale. > > From the implementation of BatchPythonEvaluation, it looks like the goal was > to reuse the PythonRDD code. It caches the entire child RDD so that it can > do two passes over the data. One to give to the PythonRDD, then one to join > the python lambda results with the original row (which may have java objects > that should be passed through). > > In addition, it caches all the columns, even the ones that don't need to be > processed by the Python UDF. In the cases I was working with, I had a 500 > column table, and i wanted to use a python UDF for one column, and it ended > up caching all 500 columns. > > I have a working solution over here that does it in one pass over the data, > avoiding caching > (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b). > With this patch, I go from a job that takes 20 minutes then OOMs, to a job > that finishes completely in 3 minutes. It is indeed quite hacky and prone to > deadlocks since there is buffering in many locations: > > - NEW: the ForkingIterator LinkedBlockingDeque > - batching the rows before pickling them > - os buffers on both sides > - pyspark.serializers.BatchedSerializer > > We can avoid deadlock by being very disciplined. For example, we can have > the ForkingIterator instead always do a check of whether the > LinkedBlockingDeque is full and if so: > > Java > - flush the java pickling buffer > - send a flush command to the python process > - os.flush the java side > > Python > - flush BatchedSerializer > - os.flush() > > I haven't added this yet. This is getting very complex however. Another > model would just be to change the protocol between the java side and the > worker to be a synchronous request/response. This has the disadvantage that > the CPU isn't doing anything when the batch is being sent across, but it has > the huge advantage of simplicity. In addition, I imagine that the actual IO > between the processes isn't that slow, but rather the serialization of java > objects into pickled bytes, and the deserialization/serialization + python > loops on the python side. Another advantage is that we won't be taking more > than 100% CPU since only one thread is doing CPU work at a time between the > executor and the python interpreter. > > Any thoughts would be much appreciated =) > > Other improvements: > - extract some code of the worker out of PythonRDD so that we can do a > mapPartitions directly in BatchedPythonEvaluation without resorting to the > hackery in ForkedRDD.compute(), which uses a cache to ensure that the other > RDD can get a handle to the same iterator. > - read elements and use a size estimator to create the BlockingQueue to > make sure that we don't store too many things in memory when batching > - patch Unpickler to not use StopException for control flow, which is > slowing down the java side > > --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org