Had a quick look in your commit, I think that make sense, could you
send a PR for that, then we can review it.

In order to support 2), we need to change the serialized Python
function from `f(iter)` to `f(x)`, process one row at a time (not a
partition),
then we can easily combine them together:

for f1(f2(x))  and g1(g2(x)), we can do this in Python:

for row in reading_stream:
   x1, x2 = row
   y1 = f1(f2(x1))
   y2 = g1(g2(x2))
   yield (y1, y2)

For RDD, we still need to use `f(iter)`, but for SQL UDF, use `f(x)`.

On Sun, Jan 31, 2016 at 1:37 PM, Justin Uang <justin.u...@gmail.com> wrote:
> Hey guys,
>
> BLUF: sorry for the length of this email, trying to figure out how to batch
> Python UDF executions, and since this is my first time messing with
> catalyst, would like any feedback
>
> My team is starting to use PySpark UDFs quite heavily, and performance is a
> huge blocker. The extra roundtrip serialization from Java to Python is not a
> huge concern if we only incur it ~once per column for most workflows, since
> it'll be in the same order of magnitude as reading files from disk. However,
> right now each Python UDFs lead to a single roundtrip. There is definitely a
> lot we can do regarding this:
>
> (all the prototyping code is here:
> https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc)
>
> 1. We can't chain Python UDFs.
>
>     df.select(python_times_2(python_times_2("col1")))
>
> throws an exception saying that the inner expression isn't evaluable. The
> workaround is to do
>
>
> df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp"))
>
> This can be solved in ExtractPythonUDFs by always extracting the inner most
> Python UDF first.
>
>          // Pick the UDF we are going to evaluate (TODO: Support evaluating
> multiple UDFs at a time)
>          // If there is more than one, we will add another evaluation
> operator in a subsequent pass.
> -        udfs.find(_.resolved) match {
> +        udfs.find { udf =>
> +          udf.resolved && udf.children.map { child: Expression =>
> +            child.find { // really hacky way to find if a child of a udf
> has the PythonUDF node
> +              case p: PythonUDF => true
> +              case _ => false
> +            }.isEmpty
> +          }.reduce((x, y) => x && y)
> +        } match {
>            case Some(udf) =>
>              var evaluation: EvaluatePython = null
>
> 2. If we have a Python UDF applied to many different columns, where they
> don’t depend on each other, we can optimize them by collapsing them down
> into a single python worker. Although we have to serialize and send the same
> amount of data to the python interpreter, in the case where I am applying
> the same function to 20 columns, the overhead/context_switches of having 20
> interpreters run at the same time causes huge performance hits. I have
> confirmed this by manually taking the 20 columns, converting them to a
> struct, and then writing a UDF that processes the struct at the same time,
> and the speed difference is 2x. My approach to adding this to catalyst is
> basically to write an optimizer rule called CombinePython which joins
> adjacent EvaluatePython nodes that don’t depend on each other’s variables,
> and then having BatchPythonEvaluation run multiple lambdas at once. I would
> also like to be able to handle the case
> df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”),
> python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a
> PushDownPythonEvaluation optimizer that will push the optimization through a
> select/project, so that the CombinePython rule can join the two.
>
> 3. I would like CombinePython to be able to handle UDFs that chain off of
> each other.
>
>     df.select(python_times_2(python_times_2(“col1”)))
>
> I haven’t prototyped this yet, since it’s a lot more complex. The way I’m
> thinking about this is to still have a rule called CombinePython, except
> that the BatchPythonEvaluation will need to be smart enough to build up the
> dag of dependencies, and then feed that information to the python
> interpreter, so it can compute things in the right order, and reuse the
> in-memory objects that it has already computed. Does this seem right? Should
> the code mainly be in BatchPythonEvaluation? In addition, we will need to
> change up the protocol between the java and python sides to support sending
> this information. What is acceptable?
>
> Any help would be much appreciated! Especially w.r.t where to the design
> choices such that the PR that has a chance of being accepted.
>
> Justin

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org
For additional commands, e-mail: dev-h...@spark.apache.org

Reply via email to