Thanks for the overview of the different extension points, it's nice to see 
this laid out. (It would be great to find a place in the docs for this, IMO, or 
possibly as a blog post?)

Just to chime in quickly here:

For databases/Flight, my hope is that integrating ADBC into Arrow Datasets will 
take care of both. Plain Flight isn't quite well-defined enough to be 
meaningfully integrated (except perhaps via a generic "stream of batches" 
entrypoint), and even if we wanted to feed JDBC/ODBC into an ExecPlan, we'd 
have to do some work that would look roughly like writing an ADBC driver, so we 
may as well go that route.

-David

On Fri, Jun 3, 2022, at 16:47, Weston Pace wrote:
> Efficiently reading from a data source is something that has a bit of
> complexity (parsing files, connecting to remote data sources, managing
> parallel reads, etc.)  Ideally we don't want users to have to reinvent
> these things as they go.  The datasets module in Arrow-C++ has a lot
> of code here already.
>
> So I think there is probably more than one extension point.  Some of
> these extension points already exist.  I do believe there is
> opportunity to create further extension points as well and the
> challenge / opportunity here will be figuring out what those are and
> what their API should be.
>
> ## I can describe a little bit about what we have already:
>
>  * Filesystem abstraction
>
> Right now we have a filesystem abstraction (arrow::fs::FileSystem)
> which is pretty well documented and straightforward.  This is how we
> can swap between local disk, S3, etc.  From an Acero / datasets
> perspective the API is basically "given a path, give me a stream of
> bytes" (open file) and "given a path, give me a list of files" (list
> directory).
>
>  * FileFormat abstraction
>
> The file format abstraction (arrow::dataset::FileFormat) is how we
> swap out different kinds of files.  For example,
> arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
> plenty of details)
>
>  - Convert input file to schema (inspect)
>  - Convert input file to a stream of batches (scan)
>  - Convert a stream of batches to an output file (write)
>
>  * Fragment / Dataset abstraction
>
> The fragment (arrow::dataset::Fragment) & dataset
> (arrow::dataset::Dataset) APIs are how we describe a collection of
> files.  This is used by the scanner to implement parallel reads.  You
> can think of these as the "source" API.  The APIs are roughly
>
>  - Convert a dataset to a stream fragments (list dataset)
>  - Convert a fragment to a stream of batches (scan)
>
> The two main implementations of datasets that we have today are
> FilesystemDataset (uses a filesystem to list files, each file is a
> fragment.  A filesystem fragment uses a format to convert its file to
> a stream of batches) and the InMemoryDataset (there is one fragment
> and the scan operation is just slicing off pieces of the in-memory
> data).  There are also some niche implementations here like a dataset
> that is created from a python iterable of batches.  This might be very
> similar to what you are describing above.
>
> A dataset must be created with a "dataset schema" which is the single
> schema that all fragments of the dataset can be converted to.
>
> * Custom source nodes
>
> All of the above is exposed to Acero via the scan node which is
> responsible for turning a dataset into Acero input.  However, the
> datasets API could be bypassed entirely to feed Acero in other ways.
> Some examples:
>
>  - The table source node is a way to feed in-memory data (a table)
> into Acero.  This is very similar to the InMemoryDataset but bypasses
> some of the overhead of the scanner.
>  - The TCP-H source node generates random data for benchmarking purposes.
>
> A lot of things can be expressed both as a simple dataset or a custom
> source node.  There is a bit of duplication here and I don't know that
> it matters too much.  I'm just pointing this out for pedantic
> purposes.
>
> The API here is just the ExecNode API and so the user needs to provide
> something that starts when StartProducing is called and then calls
> InputReceived on a regular basis.  From the discussions on the
> scheduler I suspect this API may be changing slightly but the idea is
> still there.
>
> ## I'm also aware of a number of things we are still going to need at
> some point.
>
>  * Evolution
>
> Sometimes different files in a dataset have different schemas.  A very
> common case is fields getting added over time or fields getting
> renamed or changing data type (e.g. int32 -> int64).  We have some
> support for the former but none for the latter.  I've got a pretty
> good idea of what the API looks like for "evolution" so if that is
> something needed I could write that up.
>
>  * Flight dataset/fragment/source-node?
>
> I don't think there will be exactly a "FlightFragment".  Maybe the
> right term is ADBC, but I haven't been following that discussion
> closely enough.  There needs to be a way to scan a remote data source
> that provides its data via a standard flight service.
>
>  * Sql dataset/fragment/source-node?
>
> It could be very useful to have a dataset that is capable of reading
> data from SQL datasets via something like JDBC (although, if ADBC
> connectors get built quickly enough, maybe this is never needed :)
>
>  * Catalogs
>
> The filesystem dataset currently figures out the dataset schema
> through a rather expensive inspection process and it lists its
> fragments using potentially expensive directory listing.  Metadata
> catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
> of storing precomputed versions of this information.  A dataset that
> is capable of figuring out what files to scan from a catalog would be
> valuable.  This dataset might use the same filesystem fragment to do
> the actual scan.
>
>  * Table metadata
>
> Very similar to the catalogs discussion is the idea of "table
> metadata".  This is less about reading data and more about describing
> the data.  For example, metadata about any ordering of the incoming
> data, unique constraints, not-null constraints, etc.  All of this
> information can be used by exec nodes to simplify query processing.
> For example, if you are grouping on a set of keys and one of the keys
> is ordered then you implement group by with a streaming (not pipeline
> breaking) implementation.
>
>> that allows utilizing existing Python APIs that knows how to read data
>> source as a stream of record batches.
>
> We have a class called arrow::dataset::<unnamed>::OneShotFragment
> which knows how to convert a python iterator into a scannable source.
> This might serve your needs.  This was also written when things were
> more dataset-oriented.  It might also be interesting to create a
> python source node which does the same sort of thing, bypassing the
> scanner, although I don't know that there would be much concrete
> benefit.
>
> I hope this information is helpful!  It is just background though.  I
> think I might need to understand your needs in a bit more detail
> before I can offer any kind of prescriptive advice.
>
> On Fri, Jun 3, 2022 at 8:51 AM Li Jin <[email protected]> wrote:
>>
>> Actually, "UDF" might be the wrong terminology here - This is more of a
>> "custom Python data source" than "Python user defined functions". (Although
>> under the hood it can probably reuse lots of the UDF logic to execute the
>> custom data source)
>>
>> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <[email protected]> wrote:
>>
>> > What Yaron is going for is really something similar to custom data source
>> > in Spark (
>> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
>> > that allows utilizing existing Python APIs that knows how to read data
>> > source as a stream of record batches.
>> >
>> >
>> >

Reply via email to