Thanks, Weston. I was able to locally build a PoC of this to work for UDFs with some limitations - see details below. I'd like to get feedback about whether this approach is acceptable. If so, I could start contributing its components.
With respect to Li's design points: 1. A way to serialize the user function (and function parameters): This works using cloudpickle.dumps(f) where f is the Python UDF function. 2. Add a Substrait relation/expression that captures the UDF: This works using a new "UserDefinedFunction" protobuf message under the "ExtensionFunction" message. This also provides fail-safety for legacy Substrait-consumers - if the Substrait-consumer is unaware of the "UserDefinedFunction" message then it would error due to not finding the described function by name, which the Substrait-producer is required to make different from all the names in the default extension-id-registry. 3. Deserialize the Substrait relation/expression in Arrow compute and execute the UDF: This works by extending the current Scalar UDF prototype with automatic registration given the UDF function declarations embedded in the Substrait plan. The function extension registration is done in a per-plan scope, without modifying the global/default extension-if-registry and while avoiding cross-plan naming-conflicts. With respect to my design points: 1. There was no need to support a serialized UDF at the Ibis level; the existing vectorized UDF support was sufficient for capturing UDFs. 2. As planned, Ibis-Substrait is extended to support an extension function of type Python UDF. 3. Almost as planned, PyArrow is extended to process a Substrait plan that includes serialized Python UDFs, yet the result is a record-batch-reader, rather than a table as planned. Currently, the PoC only supports flat scalar-valued element-wise-type Python UDFs. I expect that adding later on support for table/dataset/struct-valued and analytic/reduction-type Python UDFs should not be too hard. Yaron. ________________________________ From: Li Jin <ice.xell...@gmail.com> Sent: Tuesday, May 17, 2022 4:46 PM To: dev@arrow.apache.org <dev@arrow.apache.org> Subject: Re: design for Python UDFs in an Ibis/Substrait/Arrow workflow Thanks Weston. > Good questions. I haven't given this a whole lot of thought yet so > take these suggestions with a grain of salt. Probably the minimal > code change approach would be to implement [2] and call the unregister > from the ExecPlan's destructor. However, this is going to eventually > lead to naming conflicts if multiple UDFs are running in parallel. We > could generate unique names to work around this. We could also > consider a second function registry, unique to the plan, combined with > the master function registry via some kind of facade registry that > prefer's local UDFs and falls back to process UDFs. I think that > might be the best long term solution. If I understand this correctly, with the [1], we can potentially deserialize the UDF in the Python process calling the Python consumer API (likely to be the user Python process), call PyArrow API to register the UDF, (maybe with a temporary name with UUID to avoid conflict) then construct a UDF ExecNode with that name? I think this would not but not sure if I missed anything. I think a second registry would help and be more clean, but with the current MVP purpose the temporary name + unregister from destructor probably works. On Mon, May 16, 2022 at 9:53 PM Weston Pace <weston.p...@gmail.com> wrote: > 1. This is probably best discussed on the Ibis mailing list (or > Github? I'm not sure how the Ibis community communicates). > > 2. If we consider "encoding the embedded function in Substrait" to be > part of (2) then this can be discussed in the Substrait community. > That being said, in the sync call last week we had a brief discussion > and to start with the EmbeddedFunction message and add an "any" > version and then propose a PR to standardize it. We will need to find > some spot to encode the embedded function into the plan itself > however. > > 3. A PR is in progress (pretty close to merging) to expose the > Substrait consumer to python. I think this will be good enough for > MVP purposes. Check out [1]. > > > we are not sure how to proceed with (3), so wondering how to > > register/execute the deserialize the UDF from substrait and execute it > > using existing UDF code. Any thoughts? > > Good questions. I haven't given this a whole lot of thought yet so > take these suggestions with a grain of salt. Probably the minimal > code change approach would be to implement [2] and call the unregister > from the ExecPlan's destructor. However, this is going to eventually > lead to naming conflicts if multiple UDFs are running in parallel. We > could generate unique names to work around this. We could also > consider a second function registry, unique to the plan, combined with > the master function registry via some kind of facade registry that > prefer's local UDFs and falls back to process UDFs. I think that > might be the best long term solution. > > [1] https://github.com/apache/arrow/pull/12672 > [2] https://issues.apache.org/jira/browse/ARROW-16211 > > On Mon, May 16, 2022 at 5:06 AM Li Jin <ice.xell...@gmail.com> wrote: > > > > For context, this is a continuation of a previous email discussion: "RPC: > > Out of Process Python UDFs in Arrow Compute" where we identified > reasonable > > steps to solve are: > > ostef Process Python UDFs > > (1) A way to serialize the user function (and function parameters) (in > the > > doc I proposed to use cloudpickle based approach similar to how Spark > does > > it) (2) Add a Substrait relation/expression that captures the the UDF (3) > > Deserialize the Substrait relation/expression in Arrow compute and > execute > > the UDF (either using the approach in the current Scalar UDF prototype or > > do sth else) > > (Same as the Yaron layout above). > > > > Now I think we have reasonable solutions are (1) and (2) (at least for > PoC > > purpose), but we are not sure how to proceed with (3), so wondering how > to > > register/execute the deserialize the UDF from substrait and execute it > > using existing UDF code. Any thoughts? > > > > Li > > > > > > On Sun, May 15, 2022 at 8:02 AM Yaron Gvili <rt...@hotmail.com> wrote: > > > > > Hi, > > > > > > I'm working on a Python UDFs PoC and would like to get the community's > > > feedback on its design. > > > > > > The goal of this PoC is to enable a user to integrate Python UDFs in an > > > Ibis/Substrait/Arrow workflow. The basic idea is that the user would > create > > > an Ibis expression that includes Python UDFs implemented by the user, > then > > > use a single invocation to: > > > > > > 1. produce a Substrait plan for the Ibis expression; > > > 2. deliver the Substrait plan to Arrow; > > > 3. consume the Substrait plan in Arrow to obtain an execution plan; > > > 4. run the execution plan; > > > 5. deliver back the resulting table outputs of the plan. > > > > > > I already have the above steps implemented, with support for some > existing > > > execution nodes and compute functions in Arrow, but without the > support for > > > Python UDFs, which is the focus of this discussion. To add this > support I > > > am thinking about a design where: > > > > > > * Ibis is extended to support an expression of type Python UDF, > which > > > carries (text-form) serialized UDF code (likely using cloudpickle) > > > * Ibis-Substrait is extended to support an extension function of > type > > > Python UDF, which also carries (text-within-protobuf-form) serialized > UDF > > > code > > > * PyArrow is extended to process a Substrait plan that includes > > > serialized Python UDFs using these steps: > > > * Deserialize the Python UDFs > > > * Register the Python UDFs (preferably, in a local scope that is > > > cleared after processing ends) > > > * Run the execution plan corresponding to the Substrait plan > > > * Return the resulting table outputs of the plan > > > > > > Note that this design requires the user to drive PyArrow. It does not > > > support driving Arrow C++ to process such a Substrait plan (it does not > > > start an embedded Python interpreter). > > > > > > > > > Cheers, > > > Yaron. > > > >