Hi Davies,

In general, do we expect people to use CPython only for "heavyweight" UDFs
that invoke an external library? Are there any examples of using Jython,
especially performance comparisons to Java/Scala and CPython? When using
Jython, do you expect the driver to send code to the executor as a string,
or is there a good way to serialized Jython lambdas?

(For context, I was unable to serialize Nashorn lambdas when I tried to use
them in Spark.)

Punya
On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <dav...@databricks.com> wrote:

> Fare points, I also like simpler solutions.
>
> The overhead of Python task could be a few of milliseconds, which
> means we also should eval them as batches (one Python task per batch).
>
> Decreasing the batch size for UDF sounds reasonable to me, together
> with other tricks to reduce the data in socket/pipe buffer.
>
> BTW, what do your UDF looks like? How about to use Jython to run
> simple Python UDF (without some external libraries).
>
> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.u...@gmail.com>
> wrote:
> > // + punya
> >
> > Thanks for your quick response!
> >
> > I'm not sure that using an unbounded buffer is a good solution to the
> > locking problem. For example, in the situation where I had 500 columns,
> I am
> > in fact storing 499 extra columns on the java side, which might make me
> OOM
> > if I have to store many rows. In addition, if I am using an
> > AutoBatchedSerializer, the java side might have to write 1 << 16 == 65536
> > rows before python starts outputting elements, in which case, the Java
> side
> > has to buffer 65536 complete rows. In general it seems fragile to rely on
> > blocking behavior in the Python coprocess. By contrast, it's very easy to
> > verify the correctness and performance characteristics of the synchronous
> > blocking solution.
> >
> >
> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <dav...@databricks.com>
> wrote:
> >>
> >> Thanks for looking into it, I'd like the idea of having
> >> ForkingIterator. If we have unlimited buffer in it, then will not have
> >> the problem of deadlock, I think. The writing thread will be blocked
> >> by Python process, so there will be not much rows be buffered(still be
> >> a reason to OOM). At least, this approach is better than current one.
> >>
> >> Could you create a JIRA and sending out the PR?
> >>
> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.u...@gmail.com>
> >> wrote:
> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
> scale,
> >> > but
> >> > I have a proof-of-concept implementation that avoids caching the
> entire
> >> > dataset.
> >> >
> >> > Hi,
> >> >
> >> > We have been running into performance problems using Python UDFs with
> >> > DataFrames at large scale.
> >> >
> >> > From the implementation of BatchPythonEvaluation, it looks like the
> goal
> >> > was
> >> > to reuse the PythonRDD code. It caches the entire child RDD so that it
> >> > can
> >> > do two passes over the data. One to give to the PythonRDD, then one to
> >> > join
> >> > the python lambda results with the original row (which may have java
> >> > objects
> >> > that should be passed through).
> >> >
> >> > In addition, it caches all the columns, even the ones that don't need
> to
> >> > be
> >> > processed by the Python UDF. In the cases I was working with, I had a
> >> > 500
> >> > column table, and i wanted to use a python UDF for one column, and it
> >> > ended
> >> > up caching all 500 columns.
> >> >
> >> > I have a working solution over here that does it in one pass over the
> >> > data,
> >> > avoiding caching
> >> >
> >> > (
> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b
> ).
> >> > With this patch, I go from a job that takes 20 minutes then OOMs, to a
> >> > job
> >> > that finishes completely in 3 minutes. It is indeed quite hacky and
> >> > prone to
> >> > deadlocks since there is buffering in many locations:
> >> >
> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
> >> >     - batching the rows before pickling them
> >> >     - os buffers on both sides
> >> >     - pyspark.serializers.BatchedSerializer
> >> >
> >> > We can avoid deadlock by being very disciplined. For example, we can
> >> > have
> >> > the ForkingIterator instead always do a check of whether the
> >> > LinkedBlockingDeque is full and if so:
> >> >
> >> > Java
> >> >     - flush the java pickling buffer
> >> >     - send a flush command to the python process
> >> >     - os.flush the java side
> >> >
> >> > Python
> >> >     - flush BatchedSerializer
> >> >     - os.flush()
> >> >
> >> > I haven't added this yet. This is getting very complex however.
> Another
> >> > model would just be to change the protocol between the java side and
> the
> >> > worker to be a synchronous request/response. This has the disadvantage
> >> > that
> >> > the CPU isn't doing anything when the batch is being sent across, but
> it
> >> > has
> >> > the huge advantage of simplicity. In addition, I imagine that the
> actual
> >> > IO
> >> > between the processes isn't that slow, but rather the serialization of
> >> > java
> >> > objects into pickled bytes, and the deserialization/serialization +
> >> > python
> >> > loops on the python side. Another advantage is that we won't be taking
> >> > more
> >> > than 100% CPU since only one thread is doing CPU work at a time
> between
> >> > the
> >> > executor and the python interpreter.
> >> >
> >> > Any thoughts would be much appreciated =)
> >> >
> >> > Other improvements:
> >> >     - extract some code of the worker out of PythonRDD so that we can
> do
> >> > a
> >> > mapPartitions directly in BatchedPythonEvaluation without resorting to
> >> > the
> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the
> >> > other
> >> > RDD can get a handle to the same iterator.
> >> >     - read elements and use a size estimator to create the
> BlockingQueue
> >> > to
> >> > make sure that we don't store too many things in memory when batching
> >> >     - patch Unpickler to not use StopException for control flow, which
> >> > is
> >> > slowing down the java side
> >> >
> >> >
>

Reply via email to