Had a quick look in your commit, I think that make sense, could you send a PR for that, then we can review it.
In order to support 2), we need to change the serialized Python function from `f(iter)` to `f(x)`, process one row at a time (not a partition), then we can easily combine them together: for f1(f2(x)) and g1(g2(x)), we can do this in Python: for row in reading_stream: x1, x2 = row y1 = f1(f2(x1)) y2 = g1(g2(x2)) yield (y1, y2) For RDD, we still need to use `f(iter)`, but for SQL UDF, use `f(x)`. On Sun, Jan 31, 2016 at 1:37 PM, Justin Uang <justin.u...@gmail.com> wrote: > Hey guys, > > BLUF: sorry for the length of this email, trying to figure out how to batch > Python UDF executions, and since this is my first time messing with > catalyst, would like any feedback > > My team is starting to use PySpark UDFs quite heavily, and performance is a > huge blocker. The extra roundtrip serialization from Java to Python is not a > huge concern if we only incur it ~once per column for most workflows, since > it'll be in the same order of magnitude as reading files from disk. However, > right now each Python UDFs lead to a single roundtrip. There is definitely a > lot we can do regarding this: > > (all the prototyping code is here: > https://github.com/justinuang/spark/commit/8176749f8a6e6dc5a49fbbb952735ff40fb309fc) > > 1. We can't chain Python UDFs. > > df.select(python_times_2(python_times_2("col1"))) > > throws an exception saying that the inner expression isn't evaluable. The > workaround is to do > > > df.select(python_times_2("col1").alias("tmp")).select(python_time_2("tmp")) > > This can be solved in ExtractPythonUDFs by always extracting the inner most > Python UDF first. > > // Pick the UDF we are going to evaluate (TODO: Support evaluating > multiple UDFs at a time) > // If there is more than one, we will add another evaluation > operator in a subsequent pass. > - udfs.find(_.resolved) match { > + udfs.find { udf => > + udf.resolved && udf.children.map { child: Expression => > + child.find { // really hacky way to find if a child of a udf > has the PythonUDF node > + case p: PythonUDF => true > + case _ => false > + }.isEmpty > + }.reduce((x, y) => x && y) > + } match { > case Some(udf) => > var evaluation: EvaluatePython = null > > 2. If we have a Python UDF applied to many different columns, where they > don’t depend on each other, we can optimize them by collapsing them down > into a single python worker. Although we have to serialize and send the same > amount of data to the python interpreter, in the case where I am applying > the same function to 20 columns, the overhead/context_switches of having 20 > interpreters run at the same time causes huge performance hits. I have > confirmed this by manually taking the 20 columns, converting them to a > struct, and then writing a UDF that processes the struct at the same time, > and the speed difference is 2x. My approach to adding this to catalyst is > basically to write an optimizer rule called CombinePython which joins > adjacent EvaluatePython nodes that don’t depend on each other’s variables, > and then having BatchPythonEvaluation run multiple lambdas at once. I would > also like to be able to handle the case > df.select(python_times_2(“col1”).alias(“col1x2”)).select(F.col(“col1x2”), > python_times_2(“col1x2”).alias(“col1x4”)). To get around that, I add a > PushDownPythonEvaluation optimizer that will push the optimization through a > select/project, so that the CombinePython rule can join the two. > > 3. I would like CombinePython to be able to handle UDFs that chain off of > each other. > > df.select(python_times_2(python_times_2(“col1”))) > > I haven’t prototyped this yet, since it’s a lot more complex. The way I’m > thinking about this is to still have a rule called CombinePython, except > that the BatchPythonEvaluation will need to be smart enough to build up the > dag of dependencies, and then feed that information to the python > interpreter, so it can compute things in the right order, and reuse the > in-memory objects that it has already computed. Does this seem right? Should > the code mainly be in BatchPythonEvaluation? In addition, we will need to > change up the protocol between the java and python sides to support sending > this information. What is acceptable? > > Any help would be much appreciated! Especially w.r.t where to the design > choices such that the PR that has a chance of being accepted. > > Justin --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@spark.apache.org For additional commands, e-mail: dev-h...@spark.apache.org