This is a nice discussion and would be great to have a doc as David suggested. Also I think this would help us to explore the usability piece of Python UDFs too. May be we can start documenting and go for a PoC and iteratively go for an implementation.
On Tue, Jun 7, 2022 at 12:08 AM Weston Pace <weston.p...@gmail.com> wrote: > That makes sense to me. These are my (perhaps naive) first thoughts > at the shape such an implementation might take... > > * The function returns an iterator or a context manager that returns an > iterator > * If a context manager then __enter__ will be called when the engine > is ready to start receiving data from that source > * Each call to `next()` will be made from a new I/O thread task so the > call is free to block until data has arrived > * The engine will not make reentrant calls to `next()`. If the source > wants to do readahead it needs to manage that on its own. > * At some point the engine will be finished > * If a context manager then __exit__ will be called. Otherwise the > engine will just drop the reference so the source is eligible for gc > * This might happen before the iterator is exhausted (user requested > cancellation) > * This cleanup will happen in the destructor of the source node > * The call to __exit__ should not return until all files are closed, > resources released, etc. > > This should be doable with the current source node and building a > special async generator for these python sources. The Substrait > consumer would just need to instantiate this generator. The function > registry wouldn't need to be involved although I could imagine > implementations in which it is. > > I don't know if your producers and consumers live on the same server > or not. If they do not it could be a little tricky for users to test > / develop these sources since they probably depend on > filesystems/connections/etc. that don't exist on the producer. > > I don't know if "context manager that returns an iterator" is a > regular enough python concept to be intuitive or if it would be better > to invent a new class at that point. The key lifecycle moments are: > > * Start - Called when the engine is ready. Today this will be called > as soon as the engine starts but there has been discussion that, in > the future, we might want to delay this. For example, if connected to > a hash-join node you might not want to start reading the probe side > immediately and instead focus on the build side. I don't know if this > will ever happen but it's probably better to plan for it. Even in the > case where we call it at plan start it would be useful to have this in > case we create a plan and start it much later. > * Next - Simple (safe to block) method to get the next batch of data > * Finish - Called to cleanup, potentially called before the source is > full read in the case of cancellation > > On Sun, Jun 5, 2022 at 9:54 PM Yaron Gvili <rt...@hotmail.com> wrote: > > > > > Any C++ extension point could become a pyarrow extension point if > desired. > > > > Agreed. Still, the listed extension points are for supporting an > implementation, not integration. The existing integration code for a Scalar > UDF written in Python (See https://github.com/apache/arrow/pull/12590) is > the kind of code I have in mind for supporting integration. > > > > > I have a few questions then. > > > > Before I answer each question, here's a wider view of the design I have > in mind. The (Python code of the) data-source UDF is intended to be > serialized and embedded in a Substrait plan (I already have this > serialization and embedding implemented locally - PR to come). The UDF > has/needs no arguments because its code is specific to the plan. The UDF > returns a new stream/generator of RecordBatch instances to be used by the > source-node in each execution of the plan. The UDF itself is made by a > UDF-maker that does get arguments, which are used to derive values that are > fixed into the UDF being made. > > > > To your questions in order: > > > > 1. The data-source UDF is stateless and takes no arguments. The > UDF-maker does takes arguments while the RecordBatch stream/generator has > state. > > 2. The data-source UDF does not need any information from the > execution plan; it does exactly the same thing in each execution - return a > new RecordBatch stream/generator for the same data. Only the source-node > that holds the data-source UDF interacts with the execution plan. > > 3. The data-source UDF is fixed with (not receives) values derived by > the UDF-maker from its arguments. A path can certainly be included in these > values. Functions like "Close" may be implemented in the RecordBatch > stream/generator. > > 4. Using the function registry is actually not a critical part of the > design I'm proposing, which is one reason I asked about it. It just seemed > to me as a reasonable place for the execution plan to find the data-source > UDF after it gets deserialized from the Substrait plan. If the function > registry is not used, then the design would need some other registry or > mechanism for passing the deserialized data source-UDF to the execution > plan. > > 5. The data-source UDF is specific to an execution plan, so > definitely specific to the user who created the Substrait plan in which it > is embedded. Users (or perhaps authors) can share data-source-UDF-makers, > or libraries thereof, for general purpose. > > > > Yaron. > > ________________________________ > > From: Weston Pace <weston.p...@gmail.com> > > Sent: Saturday, June 4, 2022 2:41 PM > > To: dev@arrow.apache.org <dev@arrow.apache.org> > > Subject: Re: data-source UDFs > > > > > The former is about facilities (like extension points) for > implementing custom data sources in Arrow whereas the latter is about > facilities for integrating in PyArrow (existing or future) data sources > written/wrapped in Python > > > > Any C++ extension point could become a pyarrow extension point if > > desired. For example, we already have a pure-python filesystem > > adapter if a user wants to implement a filesystem purely in python > > (this is how the fsspec adapter works I think). > > > > > I'm especially interested in feedback about the new function-kind and > extensions of "cpp/src/arrow/python/udf.h" I proposed > > > > I have a few questions then. > > > > You mentioned that these functions would take no arguments. Li's > > rough example showed a path being provided to the UDF. > > > > 1. If the UDF takes no arguments then I will assume it must be > > stateful. That doesn't really match with the current idea of the > > function registry. Although it may be a problem we need to someday > > solve for aggregate UDFs. What would you imagine the lifecycle is for > > this function? If it is stateful would it not be more accurate to > > call it something other than a function? > > 2. If the UDF receives no arguments then do you need to communicate > > any part of the execution plan (e.g. the Substrait plan) to the UDF? > > How would you envision this happening? Or is this a source that > > always does the exact same thing regardless of the execution plan > > (this could be valid if, for example, the user configures the source > > with a plan independent of the execution plan before they register > > it)? > > 3. If the UDF is not stateful then it receives some arguments. What > > are they? Does the UDF receive a path? Or does the UDF generate > > paths? Are there two different kinds of UDFs, ones that receive paths > > and ones that generate paths? > > > > Most of the extension points I described have more than one method. > > Even in the APIs where I only listed one method there are some utility > > methods like `Close` which I didn't list for simplicity. While you > > can generally represent a class as a "bag of functions" (a stateful > > class is a "bag of functions that takes state as the first argument") > > I find it to be more difficult for users to follow than just > > registering a type with a defined interface. > > > > 4. Is there a particular reason in your use case for using the > > function registry for this? > > 5. Do you imagine these UDFs would always be specific to particular > > users? Or would it be possible for such a UDF to be shared as a > > general purpose utility? > > > > On Sat, Jun 4, 2022 at 3:02 AM Yaron Gvili <rt...@hotmail.com> wrote: > > > > > > Thanks for the detailed overview, Weston. I agree with David this > would be very useful to have in a public doc. > > > > > > Weston and David's discussion is a good one, however, I see it as > separate from the discussion I brought up. The former is about facilities > (like extension points) for implementing custom data sources in Arrow > whereas the latter is about facilities for integrating in PyArrow (existing > or future) data sources written/wrapped in Python. In this latter > discussion, I'm indifferent to the complexities of data source > implementation. I'm especially interested in feedback about the new > function-kind and extensions of "cpp/src/arrow/python/udf.h" I proposed, as > well as possible alternatives to these, and more generally in reaching > consensus about how a custom data-source written/wrapped in Python would > get integrated. > > > > > > > > At the moment as we > > > > are not exposing the execution engine primitives to Python user, are > you > > > > expecting to expose them by this approach. > > > > > > > > From our side, these APIs are not directly exposed to the end user, > but > > > > rather, primitives that allow us to build on top of. > > > > > > For clarity of discussion, I'd suggest distinguishing between a > data-source-integrator and an Acero-user (or end-user), since in many use > cases these are not the same person. When I wrote user, I meant a > data-source-integrator. An Acero-user would not be directly using the > facilities I proposed. > > > > > > > > > Yaron. > > > ________________________________ > > > From: David Li <lidav...@apache.org> > > > Sent: Friday, June 3, 2022 5:53 PM > > > To: dev@arrow.apache.org <dev@arrow.apache.org> > > > Subject: Re: data-source UDFs > > > > > > Thanks for the overview of the different extension points, it's nice > to see this laid out. (It would be great to find a place in the docs for > this, IMO, or possibly as a blog post?) > > > > > > Just to chime in quickly here: > > > > > > For databases/Flight, my hope is that integrating ADBC into Arrow > Datasets will take care of both. Plain Flight isn't quite well-defined > enough to be meaningfully integrated (except perhaps via a generic "stream > of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an > ExecPlan, we'd have to do some work that would look roughly like writing an > ADBC driver, so we may as well go that route. > > > > > > -David > > > > > > On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote: > > > > Efficiently reading from a data source is something that has a bit of > > > > complexity (parsing files, connecting to remote data sources, > managing > > > > parallel reads, etc.) Ideally we don't want users to have to > reinvent > > > > these things as they go. The datasets module in Arrow-C++ has a lot > > > > of code here already. > > > > > > > > So I think there is probably more than one extension point. Some of > > > > these extension points already exist. I do believe there is > > > > opportunity to create further extension points as well and the > > > > challenge / opportunity here will be figuring out what those are and > > > > what their API should be. > > > > > > > > ## I can describe a little bit about what we have already: > > > > > > > > * Filesystem abstraction > > > > > > > > Right now we have a filesystem abstraction (arrow::fs::FileSystem) > > > > which is pretty well documented and straightforward. This is how we > > > > can swap between local disk, S3, etc. From an Acero / datasets > > > > perspective the API is basically "given a path, give me a stream of > > > > bytes" (open file) and "given a path, give me a list of files" (list > > > > directory). > > > > > > > > * FileFormat abstraction > > > > > > > > The file format abstraction (arrow::dataset::FileFormat) is how we > > > > swap out different kinds of files. For example, > > > > arrow/orc/parquet/csv/json/... The API is roughly (glossing over > > > > plenty of details) > > > > > > > > - Convert input file to schema (inspect) > > > > - Convert input file to a stream of batches (scan) > > > > - Convert a stream of batches to an output file (write) > > > > > > > > * Fragment / Dataset abstraction > > > > > > > > The fragment (arrow::dataset::Fragment) & dataset > > > > (arrow::dataset::Dataset) APIs are how we describe a collection of > > > > files. This is used by the scanner to implement parallel reads. You > > > > can think of these as the "source" API. The APIs are roughly > > > > > > > > - Convert a dataset to a stream fragments (list dataset) > > > > - Convert a fragment to a stream of batches (scan) > > > > > > > > The two main implementations of datasets that we have today are > > > > FilesystemDataset (uses a filesystem to list files, each file is a > > > > fragment. A filesystem fragment uses a format to convert its file to > > > > a stream of batches) and the InMemoryDataset (there is one fragment > > > > and the scan operation is just slicing off pieces of the in-memory > > > > data). There are also some niche implementations here like a dataset > > > > that is created from a python iterable of batches. This might be > very > > > > similar to what you are describing above. > > > > > > > > A dataset must be created with a "dataset schema" which is the single > > > > schema that all fragments of the dataset can be converted to. > > > > > > > > * Custom source nodes > > > > > > > > All of the above is exposed to Acero via the scan node which is > > > > responsible for turning a dataset into Acero input. However, the > > > > datasets API could be bypassed entirely to feed Acero in other ways. > > > > Some examples: > > > > > > > > - The table source node is a way to feed in-memory data (a table) > > > > into Acero. This is very similar to the InMemoryDataset but bypasses > > > > some of the overhead of the scanner. > > > > - The TCP-H source node generates random data for benchmarking > purposes. > > > > > > > > A lot of things can be expressed both as a simple dataset or a custom > > > > source node. There is a bit of duplication here and I don't know > that > > > > it matters too much. I'm just pointing this out for pedantic > > > > purposes. > > > > > > > > The API here is just the ExecNode API and so the user needs to > provide > > > > something that starts when StartProducing is called and then calls > > > > InputReceived on a regular basis. From the discussions on the > > > > scheduler I suspect this API may be changing slightly but the idea is > > > > still there. > > > > > > > > ## I'm also aware of a number of things we are still going to need at > > > > some point. > > > > > > > > * Evolution > > > > > > > > Sometimes different files in a dataset have different schemas. A > very > > > > common case is fields getting added over time or fields getting > > > > renamed or changing data type (e.g. int32 -> int64). We have some > > > > support for the former but none for the latter. I've got a pretty > > > > good idea of what the API looks like for "evolution" so if that is > > > > something needed I could write that up. > > > > > > > > * Flight dataset/fragment/source-node? > > > > > > > > I don't think there will be exactly a "FlightFragment". Maybe the > > > > right term is ADBC, but I haven't been following that discussion > > > > closely enough. There needs to be a way to scan a remote data source > > > > that provides its data via a standard flight service. > > > > > > > > * Sql dataset/fragment/source-node? > > > > > > > > It could be very useful to have a dataset that is capable of reading > > > > data from SQL datasets via something like JDBC (although, if ADBC > > > > connectors get built quickly enough, maybe this is never needed :) > > > > > > > > * Catalogs > > > > > > > > The filesystem dataset currently figures out the dataset schema > > > > through a rather expensive inspection process and it lists its > > > > fragments using potentially expensive directory listing. Metadata > > > > catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways > > > > of storing precomputed versions of this information. A dataset that > > > > is capable of figuring out what files to scan from a catalog would be > > > > valuable. This dataset might use the same filesystem fragment to do > > > > the actual scan. > > > > > > > > * Table metadata > > > > > > > > Very similar to the catalogs discussion is the idea of "table > > > > metadata". This is less about reading data and more about describing > > > > the data. For example, metadata about any ordering of the incoming > > > > data, unique constraints, not-null constraints, etc. All of this > > > > information can be used by exec nodes to simplify query processing. > > > > For example, if you are grouping on a set of keys and one of the keys > > > > is ordered then you implement group by with a streaming (not pipeline > > > > breaking) implementation. > > > > > > > >> that allows utilizing existing Python APIs that knows how to read > data > > > >> source as a stream of record batches. > > > > > > > > We have a class called arrow::dataset::<unnamed>::OneShotFragment > > > > which knows how to convert a python iterator into a scannable source. > > > > This might serve your needs. This was also written when things were > > > > more dataset-oriented. It might also be interesting to create a > > > > python source node which does the same sort of thing, bypassing the > > > > scanner, although I don't know that there would be much concrete > > > > benefit. > > > > > > > > I hope this information is helpful! It is just background though. I > > > > think I might need to understand your needs in a bit more detail > > > > before I can offer any kind of prescriptive advice. > > > > > > > > On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ice.xell...@gmail.com> wrote: > > > >> > > > >> Actually, "UDF" might be the wrong terminology here - This is more > of a > > > >> "custom Python data source" than "Python user defined functions". > (Although > > > >> under the hood it can probably reuse lots of the UDF logic to > execute the > > > >> custom data source) > > > >> > > > >> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ice.xell...@gmail.com> > wrote: > > > >> > > > >> > What Yaron is going for is really something similar to custom > data source > > > >> > in Spark ( > > > >> > > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a > ) > > > >> > that allows utilizing existing Python APIs that knows how to read > data > > > >> > source as a stream of record batches. > > > >> > > > > >> > > > > >> > > -- Vibhatha Abeykoon