Thanks for looking into it, I'd like the idea of having
ForkingIterator. If we have unlimited buffer in it, then will not have
the problem of deadlock, I think. The writing thread will be blocked
by Python process, so there will be not much rows be buffered(still be
a reason to OOM). At least, this approach is better than current one.

Could you create a JIRA and sending out the PR?

On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.u...@gmail.com> wrote:
> BLUF: BatchPythonEvaluation's implementation is unusable at large scale, but
> I have a proof-of-concept implementation that avoids caching the entire
> dataset.
>
> Hi,
>
> We have been running into performance problems using Python UDFs with
> DataFrames at large scale.
>
> From the implementation of BatchPythonEvaluation, it looks like the goal was
> to reuse the PythonRDD code. It caches the entire child RDD so that it can
> do two passes over the data. One to give to the PythonRDD, then one to join
> the python lambda results with the original row (which may have java objects
> that should be passed through).
>
> In addition, it caches all the columns, even the ones that don't need to be
> processed by the Python UDF. In the cases I was working with, I had a 500
> column table, and i wanted to use a python UDF for one column, and it ended
> up caching all 500 columns.
>
> I have a working solution over here that does it in one pass over the data,
> avoiding caching
> (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
> With this patch, I go from a job that takes 20 minutes then OOMs, to a job
> that finishes completely in 3 minutes. It is indeed quite hacky and prone to
> deadlocks since there is buffering in many locations:
>
>     - NEW: the ForkingIterator LinkedBlockingDeque
>     - batching the rows before pickling them
>     - os buffers on both sides
>     - pyspark.serializers.BatchedSerializer
>
> We can avoid deadlock by being very disciplined. For example, we can have
> the ForkingIterator instead always do a check of whether the
> LinkedBlockingDeque is full and if so:
>
> Java
>     - flush the java pickling buffer
>     - send a flush command to the python process
>     - os.flush the java side
>
> Python
>     - flush BatchedSerializer
>     - os.flush()
>
> I haven't added this yet. This is getting very complex however. Another
> model would just be to change the protocol between the java side and the
> worker to be a synchronous request/response. This has the disadvantage that
> the CPU isn't doing anything when the batch is being sent across, but it has
> the huge advantage of simplicity. In addition, I imagine that the actual IO
> between the processes isn't that slow, but rather the serialization of java
> objects into pickled bytes, and the deserialization/serialization + python
> loops on the python side. Another advantage is that we won't be taking more
> than 100% CPU since only one thread is doing CPU work at a time between the
> executor and the python interpreter.
>
> Any thoughts would be much appreciated =)
>
> Other improvements:
>     - extract some code of the worker out of PythonRDD so that we can do a
> mapPartitions directly in BatchedPythonEvaluation without resorting to the
> hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
> RDD can get a handle to the same iterator.
>     - read elements and use a size estimator to create the BlockingQueue to
> make sure that we don't store too many things in memory when batching
>     - patch Unpickler to not use StopException for control flow, which is
> slowing down the java side
>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to