This is a nice discussion and would be great to have a doc as David
suggested. Also I think this would help us to explore the usability piece
of Python UDFs too. May be we can start documenting and go for a PoC and
iteratively go for an implementation.

On Tue, Jun 7, 2022 at 12:08 AM Weston Pace <weston.p...@gmail.com> wrote:

> That makes sense to me.  These are my (perhaps naive) first thoughts
> at the shape such an implementation might take...
>
> * The function returns an iterator or a context manager that returns an
> iterator
> * If a context manager then __enter__ will be called when the engine
> is ready to start receiving data from that source
> * Each call to `next()` will be made from a new I/O thread task so the
> call is free to block until data has arrived
> * The engine will not make reentrant calls to `next()`.  If the source
> wants to do readahead it needs to manage that on its own.
> * At some point the engine will be finished
>   * If a context manager then __exit__ will be called.  Otherwise the
> engine will just drop the reference so the source is eligible for gc
>   * This might happen before the iterator is exhausted (user requested
> cancellation)
>   * This cleanup will happen in the destructor of the source node
>   * The call to __exit__ should not return until all files are closed,
> resources released, etc.
>
> This should be doable with the current source node and building a
> special async generator for these python sources.  The Substrait
> consumer would just need to instantiate this generator.  The function
> registry wouldn't need to be involved although I could imagine
> implementations in which it is.
>
> I don't know if your producers and consumers live on the same server
> or not.  If they do not it could be a little tricky for users to test
> / develop these sources since they probably depend on
> filesystems/connections/etc. that don't exist on the producer.
>
> I don't know if "context manager that returns an iterator" is a
> regular enough python concept to be intuitive or if it would be better
> to invent a new class at that point.  The key lifecycle moments are:
>
>  * Start - Called when the engine is ready.  Today this will be called
> as soon as the engine starts but there has been discussion that, in
> the future, we might want to delay this.  For example, if connected to
> a hash-join node you might not want to start reading the probe side
> immediately and instead focus on the build side.  I don't know if this
> will ever happen but it's probably better to plan for it.  Even in the
> case where we call it at plan start it would be useful to have this in
> case we create a plan and start it much later.
>  * Next - Simple (safe to block) method to get the next batch of data
>  * Finish - Called to cleanup, potentially called before the source is
> full read in the case of cancellation
>
> On Sun, Jun 5, 2022 at 9:54 PM Yaron Gvili <rt...@hotmail.com> wrote:
> >
> > > Any C++ extension point could become a pyarrow extension point if
> desired.
> >
> > Agreed. Still, the listed extension points are for supporting an
> implementation, not integration. The existing integration code for a Scalar
> UDF written in Python (See https://github.com/apache/arrow/pull/12590) is
> the kind of code I have in mind for supporting integration.
> >
> > > I have a few questions then.
> >
> > Before I answer each question, here's a wider view of the design I have
> in mind. The (Python code of the) data-source UDF is intended to be
> serialized and embedded in a Substrait plan (I already have this
> serialization and embedding implemented locally - PR to come). The UDF
> has/needs no arguments because its code is specific to the plan. The UDF
> returns a new stream/generator of RecordBatch instances to be used by the
> source-node in each execution of the plan. The UDF itself is made by a
> UDF-maker that does get arguments, which are used to derive values that are
> fixed into the UDF being made.
> >
> > To your questions in order:
> >
> >   1.  The data-source UDF is stateless and takes no arguments. The
> UDF-maker does takes arguments while the RecordBatch stream/generator has
> state.
> >   2.  The data-source UDF does not need any information from the
> execution plan; it does exactly the same thing in each execution - return a
> new RecordBatch stream/generator for the same data. Only the source-node
> that holds the data-source UDF interacts with the execution plan.
> >   3.  The data-source UDF is fixed with (not receives) values derived by
> the UDF-maker from its arguments. A path can certainly be included in these
> values. Functions like "Close" may be implemented in the RecordBatch
> stream/generator.
> >   4.  Using the function registry is actually not a critical part of the
> design I'm proposing, which is one reason I asked about it. It just seemed
> to me as a reasonable place for the execution plan to find the data-source
> UDF after it gets deserialized from the Substrait plan. If the function
> registry is not used, then the design would need some other registry or
> mechanism for passing the deserialized data source-UDF to the execution
> plan.
> >   5.  The data-source UDF is specific to an execution plan, so
> definitely specific to the user who created the Substrait plan in which it
> is embedded. Users (or perhaps authors) can share data-source-UDF-makers,
> or libraries thereof, for general purpose.
> >
> > Yaron.
> > ________________________________
> > From: Weston Pace <weston.p...@gmail.com>
> > Sent: Saturday, June 4, 2022 2:41 PM
> > To: dev@arrow.apache.org <dev@arrow.apache.org>
> > Subject: Re: data-source UDFs
> >
> > > The former is about facilities (like extension points) for
> implementing custom data sources in Arrow whereas the latter is about
> facilities for integrating in PyArrow (existing or future) data sources
> written/wrapped in Python
> >
> > Any C++ extension point could become a pyarrow extension point if
> > desired.  For example, we already have a pure-python filesystem
> > adapter if a user wants to implement a filesystem purely in python
> > (this is how the fsspec adapter works I think).
> >
> > > I'm especially interested in feedback about the new function-kind and
> extensions of "cpp/src/arrow/python/udf.h" I proposed
> >
> > I have a few questions then.
> >
> > You mentioned that these functions would take no arguments.  Li's
> > rough example showed a path being provided to the UDF.
> >
> >   1. If the UDF takes no arguments then I will assume it must be
> > stateful.  That doesn't really match with the current idea of the
> > function registry.  Although it may be a problem we need to someday
> > solve for aggregate UDFs.  What would you imagine the lifecycle is for
> > this function?  If it is stateful would it not be more accurate to
> > call it something other than a function?
> >   2. If the UDF receives no arguments then do you need to communicate
> > any part of the execution plan (e.g. the Substrait plan) to the UDF?
> > How would you envision this happening?  Or is this a source that
> > always does the exact same thing regardless of the execution plan
> > (this could be valid if, for example, the user configures the source
> > with a plan independent of the execution plan before they register
> > it)?
> >   3. If the UDF is not stateful then it receives some arguments.  What
> > are they?  Does the UDF receive a path?  Or does the UDF generate
> > paths?  Are there two different kinds of UDFs, ones that receive paths
> > and ones that generate paths?
> >
> > Most of the extension points I described have more than one method.
> > Even in the APIs where I only listed one method there are some utility
> > methods like `Close` which I didn't list for simplicity.  While you
> > can generally represent a class as a "bag of functions" (a stateful
> > class is a "bag of functions that takes state as the first argument")
> > I find it to be more difficult for users to follow than just
> > registering a type with a defined interface.
> >
> >   4. Is there a particular reason in your use case for using the
> > function registry for this?
> >   5. Do you imagine these UDFs would always be specific to particular
> > users?  Or would it be possible for such a UDF to be shared as a
> > general purpose utility?
> >
> > On Sat, Jun 4, 2022 at 3:02 AM Yaron Gvili <rt...@hotmail.com> wrote:
> > >
> > > Thanks for the detailed overview, Weston. I agree with David this
> would be very useful to have in a public doc.
> > >
> > > Weston and David's discussion is a good one, however, I see it as
> separate from the discussion I brought up. The former is about facilities
> (like extension points) for implementing custom data sources in Arrow
> whereas the latter is about facilities for integrating in PyArrow (existing
> or future) data sources written/wrapped in Python. In this latter
> discussion, I'm indifferent to the complexities of data source
> implementation. I'm especially interested in feedback about the new
> function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed, as
> well as possible alternatives to these, and more generally in reaching
> consensus about how a custom data-source written/wrapped in Python would
> get integrated.
> > >
> > > > > At the moment as we
> > > > are not exposing the execution engine primitives to Python user, are
> you
> > > > expecting to expose them by this approach.
> > > >
> > > > From our side, these APIs are not directly exposed to the end user,
> but
> > > > rather, primitives that allow us to build on top of.
> > >
> > > For clarity of discussion, I'd suggest distinguishing between a
> data-source-integrator and an Acero-user (or end-user), since in many use
> cases these are not the same person. When I wrote user, I meant a
> data-source-integrator. An Acero-user would not be directly using the
> facilities I proposed.
> > >
> > >
> > > Yaron.
> > > ________________________________
> > > From: David Li <lidav...@apache.org>
> > > Sent: Friday, June 3, 2022 5:53 PM
> > > To: dev@arrow.apache.org <dev@arrow.apache.org>
> > > Subject: Re: data-source UDFs
> > >
> > > Thanks for the overview of the different extension points, it's nice
> to see this laid out. (It would be great to find a place in the docs for
> this, IMO, or possibly as a blog post?)
> > >
> > > Just to chime in quickly here:
> > >
> > > For databases/Flight, my hope is that integrating ADBC into Arrow
> Datasets will take care of both. Plain Flight isn't quite well-defined
> enough to be meaningfully integrated (except perhaps via a generic "stream
> of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an
> ExecPlan, we'd have to do some work that would look roughly like writing an
> ADBC driver, so we may as well go that route.
> > >
> > > -David
> > >
> > > On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote:
> > > > Efficiently reading from a data source is something that has a bit of
> > > > complexity (parsing files, connecting to remote data sources,
> managing
> > > > parallel reads, etc.)  Ideally we don't want users to have to
> reinvent
> > > > these things as they go.  The datasets module in Arrow-C++ has a lot
> > > > of code here already.
> > > >
> > > > So I think there is probably more than one extension point.  Some of
> > > > these extension points already exist.  I do believe there is
> > > > opportunity to create further extension points as well and the
> > > > challenge / opportunity here will be figuring out what those are and
> > > > what their API should be.
> > > >
> > > > ## I can describe a little bit about what we have already:
> > > >
> > > >  * Filesystem abstraction
> > > >
> > > > Right now we have a filesystem abstraction (arrow::fs::FileSystem)
> > > > which is pretty well documented and straightforward.  This is how we
> > > > can swap between local disk, S3, etc.  From an Acero / datasets
> > > > perspective the API is basically "given a path, give me a stream of
> > > > bytes" (open file) and "given a path, give me a list of files" (list
> > > > directory).
> > > >
> > > >  * FileFormat abstraction
> > > >
> > > > The file format abstraction (arrow::dataset::FileFormat) is how we
> > > > swap out different kinds of files.  For example,
> > > > arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
> > > > plenty of details)
> > > >
> > > >  - Convert input file to schema (inspect)
> > > >  - Convert input file to a stream of batches (scan)
> > > >  - Convert a stream of batches to an output file (write)
> > > >
> > > >  * Fragment / Dataset abstraction
> > > >
> > > > The fragment (arrow::dataset::Fragment) & dataset
> > > > (arrow::dataset::Dataset) APIs are how we describe a collection of
> > > > files.  This is used by the scanner to implement parallel reads.  You
> > > > can think of these as the "source" API.  The APIs are roughly
> > > >
> > > >  - Convert a dataset to a stream fragments (list dataset)
> > > >  - Convert a fragment to a stream of batches (scan)
> > > >
> > > > The two main implementations of datasets that we have today are
> > > > FilesystemDataset (uses a filesystem to list files, each file is a
> > > > fragment.  A filesystem fragment uses a format to convert its file to
> > > > a stream of batches) and the InMemoryDataset (there is one fragment
> > > > and the scan operation is just slicing off pieces of the in-memory
> > > > data).  There are also some niche implementations here like a dataset
> > > > that is created from a python iterable of batches.  This might be
> very
> > > > similar to what you are describing above.
> > > >
> > > > A dataset must be created with a "dataset schema" which is the single
> > > > schema that all fragments of the dataset can be converted to.
> > > >
> > > > * Custom source nodes
> > > >
> > > > All of the above is exposed to Acero via the scan node which is
> > > > responsible for turning a dataset into Acero input.  However, the
> > > > datasets API could be bypassed entirely to feed Acero in other ways.
> > > > Some examples:
> > > >
> > > >  - The table source node is a way to feed in-memory data (a table)
> > > > into Acero.  This is very similar to the InMemoryDataset but bypasses
> > > > some of the overhead of the scanner.
> > > >  - The TCP-H source node generates random data for benchmarking
> purposes.
> > > >
> > > > A lot of things can be expressed both as a simple dataset or a custom
> > > > source node.  There is a bit of duplication here and I don't know
> that
> > > > it matters too much.  I'm just pointing this out for pedantic
> > > > purposes.
> > > >
> > > > The API here is just the ExecNode API and so the user needs to
> provide
> > > > something that starts when StartProducing is called and then calls
> > > > InputReceived on a regular basis.  From the discussions on the
> > > > scheduler I suspect this API may be changing slightly but the idea is
> > > > still there.
> > > >
> > > > ## I'm also aware of a number of things we are still going to need at
> > > > some point.
> > > >
> > > >  * Evolution
> > > >
> > > > Sometimes different files in a dataset have different schemas.  A
> very
> > > > common case is fields getting added over time or fields getting
> > > > renamed or changing data type (e.g. int32 -> int64).  We have some
> > > > support for the former but none for the latter.  I've got a pretty
> > > > good idea of what the API looks like for "evolution" so if that is
> > > > something needed I could write that up.
> > > >
> > > >  * Flight dataset/fragment/source-node?
> > > >
> > > > I don't think there will be exactly a "FlightFragment".  Maybe the
> > > > right term is ADBC, but I haven't been following that discussion
> > > > closely enough.  There needs to be a way to scan a remote data source
> > > > that provides its data via a standard flight service.
> > > >
> > > >  * Sql dataset/fragment/source-node?
> > > >
> > > > It could be very useful to have a dataset that is capable of reading
> > > > data from SQL datasets via something like JDBC (although, if ADBC
> > > > connectors get built quickly enough, maybe this is never needed :)
> > > >
> > > >  * Catalogs
> > > >
> > > > The filesystem dataset currently figures out the dataset schema
> > > > through a rather expensive inspection process and it lists its
> > > > fragments using potentially expensive directory listing.  Metadata
> > > > catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
> > > > of storing precomputed versions of this information.  A dataset that
> > > > is capable of figuring out what files to scan from a catalog would be
> > > > valuable.  This dataset might use the same filesystem fragment to do
> > > > the actual scan.
> > > >
> > > >  * Table metadata
> > > >
> > > > Very similar to the catalogs discussion is the idea of "table
> > > > metadata".  This is less about reading data and more about describing
> > > > the data.  For example, metadata about any ordering of the incoming
> > > > data, unique constraints, not-null constraints, etc.  All of this
> > > > information can be used by exec nodes to simplify query processing.
> > > > For example, if you are grouping on a set of keys and one of the keys
> > > > is ordered then you implement group by with a streaming (not pipeline
> > > > breaking) implementation.
> > > >
> > > >> that allows utilizing existing Python APIs that knows how to read
> data
> > > >> source as a stream of record batches.
> > > >
> > > > We have a class called arrow::dataset::<unnamed>::OneShotFragment
> > > > which knows how to convert a python iterator into a scannable source.
> > > > This might serve your needs.  This was also written when things were
> > > > more dataset-oriented.  It might also be interesting to create a
> > > > python source node which does the same sort of thing, bypassing the
> > > > scanner, although I don't know that there would be much concrete
> > > > benefit.
> > > >
> > > > I hope this information is helpful!  It is just background though.  I
> > > > think I might need to understand your needs in a bit more detail
> > > > before I can offer any kind of prescriptive advice.
> > > >
> > > > On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ice.xell...@gmail.com> wrote:
> > > >>
> > > >> Actually, "UDF" might be the wrong terminology here - This is more
> of a
> > > >> "custom Python data source" than "Python user defined functions".
> (Although
> > > >> under the hood it can probably reuse lots of the UDF logic to
> execute the
> > > >> custom data source)
> > > >>
> > > >> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ice.xell...@gmail.com>
> wrote:
> > > >>
> > > >> > What Yaron is going for is really something similar to custom
> data source
> > > >> > in Spark (
> > > >> >
> https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a
> )
> > > >> > that allows utilizing existing Python APIs that knows how to read
> data
> > > >> > source as a stream of record batches.
> > > >> >
> > > >> >
> > > >> >
>
-- 
Vibhatha Abeykoon

Reply via email to