Thanks for the overview of the different extension points, it's nice to see this laid out. (It would be great to find a place in the docs for this, IMO, or possibly as a blog post?)
Just to chime in quickly here: For databases/Flight, my hope is that integrating ADBC into Arrow Datasets will take care of both. Plain Flight isn't quite well-defined enough to be meaningfully integrated (except perhaps via a generic "stream of batches" entrypoint), and even if we wanted to feed JDBC/ODBC into an ExecPlan, we'd have to do some work that would look roughly like writing an ADBC driver, so we may as well go that route. -David On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote: > Efficiently reading from a data source is something that has a bit of > complexity (parsing files, connecting to remote data sources, managing > parallel reads, etc.) Ideally we don't want users to have to reinvent > these things as they go. The datasets module in Arrow-C++ has a lot > of code here already. > > So I think there is probably more than one extension point. Some of > these extension points already exist. I do believe there is > opportunity to create further extension points as well and the > challenge / opportunity here will be figuring out what those are and > what their API should be. > > ## I can describe a little bit about what we have already: > > * Filesystem abstraction > > Right now we have a filesystem abstraction (arrow::fs::FileSystem) > which is pretty well documented and straightforward. This is how we > can swap between local disk, S3, etc. From an Acero / datasets > perspective the API is basically "given a path, give me a stream of > bytes" (open file) and "given a path, give me a list of files" (list > directory). > > * FileFormat abstraction > > The file format abstraction (arrow::dataset::FileFormat) is how we > swap out different kinds of files. For example, > arrow/orc/parquet/csv/json/... The API is roughly (glossing over > plenty of details) > > - Convert input file to schema (inspect) > - Convert input file to a stream of batches (scan) > - Convert a stream of batches to an output file (write) > > * Fragment / Dataset abstraction > > The fragment (arrow::dataset::Fragment) & dataset > (arrow::dataset::Dataset) APIs are how we describe a collection of > files. This is used by the scanner to implement parallel reads. You > can think of these as the "source" API. The APIs are roughly > > - Convert a dataset to a stream fragments (list dataset) > - Convert a fragment to a stream of batches (scan) > > The two main implementations of datasets that we have today are > FilesystemDataset (uses a filesystem to list files, each file is a > fragment. A filesystem fragment uses a format to convert its file to > a stream of batches) and the InMemoryDataset (there is one fragment > and the scan operation is just slicing off pieces of the in-memory > data). There are also some niche implementations here like a dataset > that is created from a python iterable of batches. This might be very > similar to what you are describing above. > > A dataset must be created with a "dataset schema" which is the single > schema that all fragments of the dataset can be converted to. > > * Custom source nodes > > All of the above is exposed to Acero via the scan node which is > responsible for turning a dataset into Acero input. However, the > datasets API could be bypassed entirely to feed Acero in other ways. > Some examples: > > - The table source node is a way to feed in-memory data (a table) > into Acero. This is very similar to the InMemoryDataset but bypasses > some of the overhead of the scanner. > - The TCP-H source node generates random data for benchmarking purposes. > > A lot of things can be expressed both as a simple dataset or a custom > source node. There is a bit of duplication here and I don't know that > it matters too much. I'm just pointing this out for pedantic > purposes. > > The API here is just the ExecNode API and so the user needs to provide > something that starts when StartProducing is called and then calls > InputReceived on a regular basis. From the discussions on the > scheduler I suspect this API may be changing slightly but the idea is > still there. > > ## I'm also aware of a number of things we are still going to need at > some point. > > * Evolution > > Sometimes different files in a dataset have different schemas. A very > common case is fields getting added over time or fields getting > renamed or changing data type (e.g. int32 -> int64). We have some > support for the former but none for the latter. I've got a pretty > good idea of what the API looks like for "evolution" so if that is > something needed I could write that up. > > * Flight dataset/fragment/source-node? > > I don't think there will be exactly a "FlightFragment". Maybe the > right term is ADBC, but I haven't been following that discussion > closely enough. There needs to be a way to scan a remote data source > that provides its data via a standard flight service. > > * Sql dataset/fragment/source-node? > > It could be very useful to have a dataset that is capable of reading > data from SQL datasets via something like JDBC (although, if ADBC > connectors get built quickly enough, maybe this is never needed :) > > * Catalogs > > The filesystem dataset currently figures out the dataset schema > through a rather expensive inspection process and it lists its > fragments using potentially expensive directory listing. Metadata > catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways > of storing precomputed versions of this information. A dataset that > is capable of figuring out what files to scan from a catalog would be > valuable. This dataset might use the same filesystem fragment to do > the actual scan. > > * Table metadata > > Very similar to the catalogs discussion is the idea of "table > metadata". This is less about reading data and more about describing > the data. For example, metadata about any ordering of the incoming > data, unique constraints, not-null constraints, etc. All of this > information can be used by exec nodes to simplify query processing. > For example, if you are grouping on a set of keys and one of the keys > is ordered then you implement group by with a streaming (not pipeline > breaking) implementation. > >> that allows utilizing existing Python APIs that knows how to read data >> source as a stream of record batches. > > We have a class called arrow::dataset::<unnamed>::OneShotFragment > which knows how to convert a python iterator into a scannable source. > This might serve your needs. This was also written when things were > more dataset-oriented. It might also be interesting to create a > python source node which does the same sort of thing, bypassing the > scanner, although I don't know that there would be much concrete > benefit. > > I hope this information is helpful! It is just background though. I > think I might need to understand your needs in a bit more detail > before I can offer any kind of prescriptive advice. > > On Fri, Jun 3, 2022 at 8:51 AM Li Jin <[email protected]> wrote: >> >> Actually, "UDF" might be the wrong terminology here - This is more of a >> "custom Python data source" than "Python user defined functions". (Although >> under the hood it can probably reuse lots of the UDF logic to execute the >> custom data source) >> >> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <[email protected]> wrote: >> >> > What Yaron is going for is really something similar to custom data source >> > in Spark ( >> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a) >> > that allows utilizing existing Python APIs that knows how to read data >> > source as a stream of record batches. >> > >> > >> >
