FYI, just submitted a PR to Pyrolite to remove their StopException. https://github.com/irmen/Pyrolite/pull/30
With my benchmark, removing it basically made it about 2x faster. On Wed, Jun 24, 2015 at 8:33 AM Punyashloka Biswal <punya.bis...@gmail.com> wrote: > Hi Davies, > > In general, do we expect people to use CPython only for "heavyweight" UDFs > that invoke an external library? Are there any examples of using Jython, > especially performance comparisons to Java/Scala and CPython? When using > Jython, do you expect the driver to send code to the executor as a string, > or is there a good way to serialized Jython lambdas? > > (For context, I was unable to serialize Nashorn lambdas when I tried to > use them in Spark.) > > Punya > On Wed, Jun 24, 2015 at 2:26 AM Davies Liu <dav...@databricks.com> wrote: > >> Fare points, I also like simpler solutions. >> >> The overhead of Python task could be a few of milliseconds, which >> means we also should eval them as batches (one Python task per batch). >> >> Decreasing the batch size for UDF sounds reasonable to me, together >> with other tricks to reduce the data in socket/pipe buffer. >> >> BTW, what do your UDF looks like? How about to use Jython to run >> simple Python UDF (without some external libraries). >> >> On Tue, Jun 23, 2015 at 8:21 PM, Justin Uang <justin.u...@gmail.com> >> wrote: >> > // + punya >> > >> > Thanks for your quick response! >> > >> > I'm not sure that using an unbounded buffer is a good solution to the >> > locking problem. For example, in the situation where I had 500 columns, >> I am >> > in fact storing 499 extra columns on the java side, which might make me >> OOM >> > if I have to store many rows. In addition, if I am using an >> > AutoBatchedSerializer, the java side might have to write 1 << 16 == >> 65536 >> > rows before python starts outputting elements, in which case, the Java >> side >> > has to buffer 65536 complete rows. In general it seems fragile to rely >> on >> > blocking behavior in the Python coprocess. By contrast, it's very easy >> to >> > verify the correctness and performance characteristics of the >> synchronous >> > blocking solution. >> > >> > >> > On Tue, Jun 23, 2015 at 7:21 PM Davies Liu <dav...@databricks.com> >> wrote: >> >> >> >> Thanks for looking into it, I'd like the idea of having >> >> ForkingIterator. If we have unlimited buffer in it, then will not have >> >> the problem of deadlock, I think. The writing thread will be blocked >> >> by Python process, so there will be not much rows be buffered(still be >> >> a reason to OOM). At least, this approach is better than current one. >> >> >> >> Could you create a JIRA and sending out the PR? >> >> >> >> On Tue, Jun 23, 2015 at 3:27 PM, Justin Uang <justin.u...@gmail.com> >> >> wrote: >> >> > BLUF: BatchPythonEvaluation's implementation is unusable at large >> scale, >> >> > but >> >> > I have a proof-of-concept implementation that avoids caching the >> entire >> >> > dataset. >> >> > >> >> > Hi, >> >> > >> >> > We have been running into performance problems using Python UDFs with >> >> > DataFrames at large scale. >> >> > >> >> > From the implementation of BatchPythonEvaluation, it looks like the >> goal >> >> > was >> >> > to reuse the PythonRDD code. It caches the entire child RDD so that >> it >> >> > can >> >> > do two passes over the data. One to give to the PythonRDD, then one >> to >> >> > join >> >> > the python lambda results with the original row (which may have java >> >> > objects >> >> > that should be passed through). >> >> > >> >> > In addition, it caches all the columns, even the ones that don't >> need to >> >> > be >> >> > processed by the Python UDF. In the cases I was working with, I had a >> >> > 500 >> >> > column table, and i wanted to use a python UDF for one column, and it >> >> > ended >> >> > up caching all 500 columns. >> >> > >> >> > I have a working solution over here that does it in one pass over the >> >> > data, >> >> > avoiding caching >> >> > >> >> > ( >> https://github.com/justinuang/spark/commit/c1a415a18d31226ac580f1a9df7985571d03199b >> ). >> >> > With this patch, I go from a job that takes 20 minutes then OOMs, to >> a >> >> > job >> >> > that finishes completely in 3 minutes. It is indeed quite hacky and >> >> > prone to >> >> > deadlocks since there is buffering in many locations: >> >> > >> >> > - NEW: the ForkingIterator LinkedBlockingDeque >> >> > - batching the rows before pickling them >> >> > - os buffers on both sides >> >> > - pyspark.serializers.BatchedSerializer >> >> > >> >> > We can avoid deadlock by being very disciplined. For example, we can >> >> > have >> >> > the ForkingIterator instead always do a check of whether the >> >> > LinkedBlockingDeque is full and if so: >> >> > >> >> > Java >> >> > - flush the java pickling buffer >> >> > - send a flush command to the python process >> >> > - os.flush the java side >> >> > >> >> > Python >> >> > - flush BatchedSerializer >> >> > - os.flush() >> >> > >> >> > I haven't added this yet. This is getting very complex however. >> Another >> >> > model would just be to change the protocol between the java side and >> the >> >> > worker to be a synchronous request/response. This has the >> disadvantage >> >> > that >> >> > the CPU isn't doing anything when the batch is being sent across, >> but it >> >> > has >> >> > the huge advantage of simplicity. In addition, I imagine that the >> actual >> >> > IO >> >> > between the processes isn't that slow, but rather the serialization >> of >> >> > java >> >> > objects into pickled bytes, and the deserialization/serialization + >> >> > python >> >> > loops on the python side. Another advantage is that we won't be >> taking >> >> > more >> >> > than 100% CPU since only one thread is doing CPU work at a time >> between >> >> > the >> >> > executor and the python interpreter. >> >> > >> >> > Any thoughts would be much appreciated =) >> >> > >> >> > Other improvements: >> >> > - extract some code of the worker out of PythonRDD so that we >> can do >> >> > a >> >> > mapPartitions directly in BatchedPythonEvaluation without resorting >> to >> >> > the >> >> > hackery in ForkedRDD.compute(), which uses a cache to ensure that the >> >> > other >> >> > RDD can get a handle to the same iterator. >> >> > - read elements and use a size estimator to create the >> BlockingQueue >> >> > to >> >> > make sure that we don't store too many things in memory when batching >> >> > - patch Unpickler to not use StopException for control flow, >> which >> >> > is >> >> > slowing down the java side >> >> > >> >> > >> >