>From you comment, the 2x improvement only happens when you have the
batch size as 1, right?

On Wed, Jun 24, 2015 at 12:11 PM, Justin Uang <justin.u...@gmail.com> wrote:
> FYI, just submitted a PR to Pyrolite to remove their StopException.
> https://github.com/irmen/Pyrolite/pull/30
>
> With my benchmark, removing it basically made it about 2x faster.
>
> On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal <punya.bis...@gmail.com>
> wrote:
>>
>> Hi Davies,
>>
>> In general, do we expect people to use CPython only for "heavyweight" UDFs
>> that invoke an external library? Are there any examples of using Jython,
>> especially performance comparisons to Java/Scala and CPython? When using
>> Jython, do you expect the driver to send code to the executor as a string,
>> or is there a good way to serialized Jython lambdas?
>>
>> (For context, I was unable to serialize Nashorn lambdas when I tried to
>> use them in Spark.)
>>
>> Punya
>> On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <dav...@databricks.com> wrote:
>>>
>>> Fare points, I also like simpler solutions.
>>>
>>> The overhead of Python task could be a few of milliseconds, which
>>> means we also should eval them as batches (one Python task per batch).
>>>
>>> Decreasing the batch size for UDF sounds reasonable to me, together
>>> with other tricks to reduce the data in socket/pipe buffer.
>>>
>>> BTW, what do your UDF looks like? How about to use Jython to run
>>> simple Python UDF (without some external libraries).
>>>
>>> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.u...@gmail.com>
>>> wrote:
>>> > // + punya
>>> >
>>> > Thanks for your quick response!
>>> >
>>> > I'm not sure that using an unbounded buffer is a good solution to the
>>> > locking problem. For example, in the situation where I had 500 columns,
>>> > I am
>>> > in fact storing 499 extra columns on the java side, which might make me
>>> > OOM
>>> > if I have to store many rows. In addition, if I am using an
>>> > AutoBatchedSerializer, the java side might have to write 1 << 16 ==
>>> > 65536
>>> > rows before python starts outputting elements, in which case, the Java
>>> > side
>>> > has to buffer 65536 complete rows. In general it seems fragile to rely
>>> > on
>>> > blocking behavior in the Python coprocess. By contrast, it's very easy
>>> > to
>>> > verify the correctness and performance characteristics of the
>>> > synchronous
>>> > blocking solution.
>>> >
>>> >
>>> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <dav...@databricks.com>
>>> > wrote:
>>> >>
>>> >> Thanks for looking into it, I'd like the idea of having
>>> >> ForkingIterator. If we have unlimited buffer in it, then will not have
>>> >> the problem of deadlock, I think. The writing thread will be blocked
>>> >> by Python process, so there will be not much rows be buffered(still be
>>> >> a reason to OOM). At least, this approach is better than current one.
>>> >>
>>> >> Could you create a JIRA and sending out the PR?
>>> >>
>>> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.u...@gmail.com>
>>> >> wrote:
>>> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large
>>> >> > scale,
>>> >> > but
>>> >> > I have a proof-of-concept implementation that avoids caching the
>>> >> > entire
>>> >> > dataset.
>>> >> >
>>> >> > Hi,
>>> >> >
>>> >> > We have been running into performance problems using Python UDFs
>>> >> > with
>>> >> > DataFrames at large scale.
>>> >> >
>>> >> > From the implementation of BatchPythonEvaluation, it looks like the
>>> >> > goal
>>> >> > was
>>> >> > to reuse the PythonRDD code. It caches the entire child RDD so that
>>> >> > it
>>> >> > can
>>> >> > do two passes over the data. One to give to the PythonRDD, then one
>>> >> > to
>>> >> > join
>>> >> > the python lambda results with the original row (which may have java
>>> >> > objects
>>> >> > that should be passed through).
>>> >> >
>>> >> > In addition, it caches all the columns, even the ones that don't
>>> >> > need to
>>> >> > be
>>> >> > processed by the Python UDF. In the cases I was working with, I had
>>> >> > a
>>> >> > 500
>>> >> > column table, and i wanted to use a python UDF for one column, and
>>> >> > it
>>> >> > ended
>>> >> > up caching all 500 columns.
>>> >> >
>>> >> > I have a working solution over here that does it in one pass over
>>> >> > the
>>> >> > data,
>>> >> > avoiding caching
>>> >> >
>>> >> >
>>> >> > (https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b).
>>> >> > With this patch, I go from a job that takes 20 minutes then OOMs, to
>>> >> > a
>>> >> > job
>>> >> > that finishes completely in 3 minutes. It is indeed quite hacky and
>>> >> > prone to
>>> >> > deadlocks since there is buffering in many locations:
>>> >> >
>>> >> >     - NEW: the ForkingIterator LinkedBlockingDeque
>>> >> >     - batching the rows before pickling them
>>> >> >     - os buffers on both sides
>>> >> >     - pyspark.serializers.BatchedSerializer
>>> >> >
>>> >> > We can avoid deadlock by being very disciplined. For example, we can
>>> >> > have
>>> >> > the ForkingIterator instead always do a check of whether the
>>> >> > LinkedBlockingDeque is full and if so:
>>> >> >
>>> >> > Java
>>> >> >     - flush the java pickling buffer
>>> >> >     - send a flush command to the python process
>>> >> >     - os.flush the java side
>>> >> >
>>> >> > Python
>>> >> >     - flush BatchedSerializer
>>> >> >     - os.flush()
>>> >> >
>>> >> > I haven't added this yet. This is getting very complex however.
>>> >> > Another
>>> >> > model would just be to change the protocol between the java side and
>>> >> > the
>>> >> > worker to be a synchronous request/response. This has the
>>> >> > disadvantage
>>> >> > that
>>> >> > the CPU isn't doing anything when the batch is being sent across,
>>> >> > but it
>>> >> > has
>>> >> > the huge advantage of simplicity. In addition, I imagine that the
>>> >> > actual
>>> >> > IO
>>> >> > between the processes isn't that slow, but rather the serialization
>>> >> > of
>>> >> > java
>>> >> > objects into pickled bytes, and the deserialization/serialization +
>>> >> > python
>>> >> > loops on the python side. Another advantage is that we won't be
>>> >> > taking
>>> >> > more
>>> >> > than 100% CPU since only one thread is doing CPU work at a time
>>> >> > between
>>> >> > the
>>> >> > executor and the python interpreter.
>>> >> >
>>> >> > Any thoughts would be much appreciated =)
>>> >> >
>>> >> > Other improvements:
>>> >> >     - extract some code of the worker out of PythonRDD so that we
>>> >> > can do
>>> >> > a
>>> >> > mapPartitions directly in BatchedPythonEvaluation without resorting
>>> >> > to
>>> >> > the
>>> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that
>>> >> > the
>>> >> > other
>>> >> > RDD can get a handle to the same iterator.
>>> >> >     - read elements and use a size estimator to create the
>>> >> > BlockingQueue
>>> >> > to
>>> >> > make sure that we don't store too many things in memory when
>>> >> > batching
>>> >> >     - patch Unpickler to not use StopException for control flow,
>>> >> > which
>>> >> > is
>>> >> > slowing down the java side
>>> >> >
>>> >> >

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to