BLUF: BatchPythonEvaluation's implementation is unusable at large scale,
but I have a proof-of-concept implementation that avoids caching the entire
dataset.

Hi,

We have been running into performance problems using Python UDFs with
DataFrames at large scale.

>From the implementation of BatchPythonEvaluation, it looks like the goal
was to reuse the PythonRDD code. It caches the entire child RDD so that it
can do two passes over the data. One to give to the PythonRDD, then one to
join the python lambda results with the original row (which may have java
objects that should be passed through).

In addition, it caches all the columns, even the ones that don't need to be
processed by the Python UDF. In the cases I was working with, I had a 500
column table, and i wanted to use a python UDF for one column, and it ended
up caching all 500 columns.

I have a working solution over here that does it in one pass over the data,
avoiding caching (
https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
With this patch, I go from a job that takes 20 minutes then OOMs, to a job
that finishes completely in 3 minutes. It is indeed quite hacky and prone
to deadlocks since there is buffering in many locations:

    - NEW: the ForkingIterator LinkedBlockingDeque
    - batching the rows before pickling them
    - os buffers on both sides
    - pyspark.serializers.BatchedSerializer

We can avoid deadlock by being very disciplined. For example, we can have
the ForkingIterator instead always do a check of whether the
LinkedBlockingDeque is full and if so:

Java
    - flush the java pickling buffer
    - send a flush command to the python process
    - os.flush the java side

Python
    - flush BatchedSerializer
    - os.flush()

I haven't added this yet. This is getting very complex however. Another
model would just be to change the protocol between the java side and the
worker to be a synchronous request/response. This has the disadvantage that
the CPU isn't doing anything when the batch is being sent across, but it
has the huge advantage of simplicity. In addition, I imagine that the
actual IO between the processes isn't that slow, but rather the
serialization of java objects into pickled bytes, and the
deserialization/serialization + python loops on the python side. Another
advantage is that we won't be taking more than 100% CPU since only one
thread is doing CPU work at a time between the executor and the python
interpreter.

Any thoughts would be much appreciated =)

Other improvements:
    - extract some code of the worker out of PythonRDD so that we can do a
mapPartitions directly in BatchedPythonEvaluation without resorting to the
hackery in ForkedRDD.compute(), which uses a cache to ensure that the other
RDD can get a handle to the same iterator.
    - read elements and use a size estimator to create the BlockingQueue to
make sure that we don't store too many things in memory when batching
    - patch Unpickler to not use StopException for control flow, which is
slowing down the java side

Reply via email to