Thanks, Weston.

I was able to locally build a PoC of this to work for UDFs with some 
limitations - see details below. I'd like to get feedback about whether this 
approach is acceptable. If so, I could start contributing its components.

With respect to Li's design points:

  1.  A way to serialize the user function (and function parameters): This 
works using cloudpickle.dumps(f) where f is the Python UDF function.
  2.  Add a Substrait relation/expression that captures the UDF: This works 
using a new "UserDefinedFunction" protobuf message under the 
"ExtensionFunction" message. This also provides fail-safety for legacy 
Substrait-consumers - if the Substrait-consumer is unaware of the 
"UserDefinedFunction" message then it would error due to not finding the 
described function by name, which the Substrait-producer is required to make 
different from all the names in the default extension-id-registry.
  3.  Deserialize the Substrait relation/expression in Arrow compute and 
execute the UDF: This works by extending the current Scalar UDF prototype with 
automatic registration given the UDF function declarations embedded in the 
Substrait plan. The function extension registration is done in a per-plan 
scope, without modifying the global/default extension-if-registry and while 
avoiding cross-plan naming-conflicts.

With respect to my design points:

  1.  There was no need to support a serialized UDF at the Ibis level; the 
existing vectorized UDF support was sufficient for capturing UDFs.
  2.  As planned, Ibis-Substrait is extended to support an extension function 
of type Python UDF.
  3.  Almost as planned, PyArrow is extended to process a Substrait plan that 
includes serialized Python UDFs, yet the result is a record-batch-reader, 
rather than a table as planned.

Currently, the PoC only supports flat scalar-valued element-wise-type Python 
UDFs. I expect that adding later on support for table/dataset/struct-valued and 
analytic/reduction-type Python UDFs should not be too hard.


Yaron.
________________________________
From: Li Jin <ice.xell...@gmail.com>
Sent: Tuesday, May 17, 2022 4:46 PM
To: dev@arrow.apache.org <dev@arrow.apache.org>
Subject: Re: design for Python UDFs in an Ibis/Substrait/Arrow workflow

Thanks Weston.

> Good questions.  I haven't given this a whole lot of thought yet so
> take these suggestions with a grain of salt.  Probably the minimal
> code change approach would be to implement [2] and call the unregister
> from the ExecPlan's destructor.  However, this is going to eventually
> lead to naming conflicts if multiple UDFs are running in parallel.  We
> could generate unique names to work around this.  We could also
> consider a second function registry, unique to the plan, combined with
> the master function registry via some kind of facade registry that
> prefer's local UDFs and falls back to process UDFs.  I think that
> might be the best long term solution.

If I understand this correctly, with the [1], we can potentially
deserialize the
UDF in the Python process calling the Python consumer API (likely to be the
user Python process), call PyArrow API to register the UDF, (maybe with a
temporary
name with UUID to avoid conflict) then construct a UDF ExecNode with that
name?
I think this would not but not sure if I missed anything. I think a second
registry would
help and be more clean, but with the current MVP purpose the temporary name
+ unregister
from destructor probably works.

On Mon, May 16, 2022 at 9:53 PM Weston Pace <weston.p...@gmail.com> wrote:

> 1. This is probably best discussed on the Ibis mailing list (or
> Github?  I'm not sure how the Ibis community communicates).
>
> 2. If we consider "encoding the embedded function in Substrait" to be
> part of (2) then this can be discussed in the Substrait community.
> That being said, in the sync call last week we had a brief discussion
> and to start with the EmbeddedFunction message and add an "any"
> version and then propose a PR to standardize it.  We will need to find
> some spot to encode the embedded function into the plan itself
> however.
>
> 3. A PR is in progress (pretty close to merging) to expose the
> Substrait consumer to python.  I think this will be good enough for
> MVP purposes.  Check out [1].
>
> > we are not sure how to proceed with (3), so wondering how to
> > register/execute the deserialize the UDF from substrait and execute it
> > using existing UDF code. Any thoughts?
>
> Good questions.  I haven't given this a whole lot of thought yet so
> take these suggestions with a grain of salt.  Probably the minimal
> code change approach would be to implement [2] and call the unregister
> from the ExecPlan's destructor.  However, this is going to eventually
> lead to naming conflicts if multiple UDFs are running in parallel.  We
> could generate unique names to work around this.  We could also
> consider a second function registry, unique to the plan, combined with
> the master function registry via some kind of facade registry that
> prefer's local UDFs and falls back to process UDFs.  I think that
> might be the best long term solution.
>
> [1] https://github.com/apache/arrow/pull/12672
> [2] https://issues.apache.org/jira/browse/ARROW-16211
>
> On Mon, May 16, 2022 at 5:06 AM Li Jin <ice.xell...@gmail.com> wrote:
> >
> > For context, this is a continuation of a previous email discussion: "RPC:
> > Out of Process Python UDFs in Arrow Compute" where we identified
> reasonable
> > steps to solve are:
> > ostef Process Python UDFs
> > (1) A way to serialize the user function (and function parameters) (in
> the
> > doc I proposed to use cloudpickle based approach similar to how Spark
> does
> > it) (2) Add a Substrait relation/expression that captures the the UDF (3)
> > Deserialize the Substrait relation/expression in Arrow compute and
> execute
> > the UDF (either using the approach in the current Scalar UDF prototype or
> > do sth else)
> > (Same as the Yaron layout above).
> >
> > Now I think we have reasonable solutions are (1) and (2) (at least for
> PoC
> > purpose), but we are not sure how to proceed with (3), so wondering how
> to
> > register/execute the deserialize the UDF from substrait and execute it
> > using existing UDF code. Any thoughts?
> >
> > Li
> >
> >
> > On Sun, May 15, 2022 at 8:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > > Hi,
> > >
> > > I'm working on a Python UDFs PoC and would like to get the community's
> > > feedback on its design.
> > >
> > > The goal of this PoC is to enable a user to integrate Python UDFs in an
> > > Ibis/Substrait/Arrow workflow. The basic idea is that the user would
> create
> > > an Ibis expression that includes Python UDFs implemented by the user,
> then
> > > use a single invocation to:
> > >
> > >   1.  produce a Substrait plan for the Ibis expression;
> > >   2.  deliver the Substrait plan to Arrow;
> > >   3.  consume the Substrait plan in Arrow to obtain an execution plan;
> > >   4.  run the execution plan;
> > >   5.  deliver back the resulting table outputs of the plan.
> > >
> > > I already have the above steps implemented, with support for some
> existing
> > > execution nodes and compute functions in Arrow, but without the
> support for
> > > Python UDFs, which is the focus of this discussion. To add this
> support I
> > > am thinking about a design where:
> > >
> > >   *   Ibis is extended to support an expression of type Python UDF,
> which
> > > carries (text-form) serialized UDF code (likely using cloudpickle)
> > >   *   Ibis-Substrait is extended to support an extension function of
> type
> > > Python UDF, which also carries (text-within-protobuf-form) serialized
> UDF
> > > code
> > >   *   PyArrow is extended to process a Substrait plan that includes
> > > serialized Python UDFs using these steps:
> > >      *   Deserialize the Python UDFs
> > >      *   Register the Python UDFs (preferably, in a local scope that is
> > > cleared after processing ends)
> > >      *   Run the execution plan corresponding to the Substrait plan
> > >      *   Return the resulting table outputs of the plan
> > >
> > > Note that this design requires the user to drive PyArrow. It does not
> > > support driving Arrow C++ to process such a Substrait plan (it does not
> > > start an embedded Python interpreter).
> > >
> > >
> > > Cheers,
> > > Yaron.
> > >
>

Reply via email to