Efficiently reading from a data source is something that has a bit of
complexity (parsing files, connecting to remote data sources, managing
parallel reads, etc.)  Ideally we don't want users to have to reinvent
these things as they go.  The datasets module in Arrow-C++ has a lot
of code here already.

So I think there is probably more than one extension point.  Some of
these extension points already exist.  I do believe there is
opportunity to create further extension points as well and the
challenge / opportunity here will be figuring out what those are and
what their API should be.

## I can describe a little bit about what we have already:

 * Filesystem abstraction

Right now we have a filesystem abstraction (arrow::fs::FileSystem)
which is pretty well documented and straightforward.  This is how we
can swap between local disk, S3, etc.  From an Acero / datasets
perspective the API is basically "given a path, give me a stream of
bytes" (open file) and "given a path, give me a list of files" (list
directory).

 * FileFormat abstraction

The file format abstraction (arrow::dataset::FileFormat) is how we
swap out different kinds of files.  For example,
arrow/orc/parquet/csv/json/...  The API is roughly (glossing over
plenty of details)

 - Convert input file to schema (inspect)
 - Convert input file to a stream of batches (scan)
 - Convert a stream of batches to an output file (write)

 * Fragment / Dataset abstraction

The fragment (arrow::dataset::Fragment) & dataset
(arrow::dataset::Dataset) APIs are how we describe a collection of
files.  This is used by the scanner to implement parallel reads.  You
can think of these as the "source" API.  The APIs are roughly

 - Convert a dataset to a stream fragments (list dataset)
 - Convert a fragment to a stream of batches (scan)

The two main implementations of datasets that we have today are
FilesystemDataset (uses a filesystem to list files, each file is a
fragment.  A filesystem fragment uses a format to convert its file to
a stream of batches) and the InMemoryDataset (there is one fragment
and the scan operation is just slicing off pieces of the in-memory
data).  There are also some niche implementations here like a dataset
that is created from a python iterable of batches.  This might be very
similar to what you are describing above.

A dataset must be created with a "dataset schema" which is the single
schema that all fragments of the dataset can be converted to.

* Custom source nodes

All of the above is exposed to Acero via the scan node which is
responsible for turning a dataset into Acero input.  However, the
datasets API could be bypassed entirely to feed Acero in other ways.
Some examples:

 - The table source node is a way to feed in-memory data (a table)
into Acero.  This is very similar to the InMemoryDataset but bypasses
some of the overhead of the scanner.
 - The TCP-H source node generates random data for benchmarking purposes.

A lot of things can be expressed both as a simple dataset or a custom
source node.  There is a bit of duplication here and I don't know that
it matters too much.  I'm just pointing this out for pedantic
purposes.

The API here is just the ExecNode API and so the user needs to provide
something that starts when StartProducing is called and then calls
InputReceived on a regular basis.  From the discussions on the
scheduler I suspect this API may be changing slightly but the idea is
still there.

## I'm also aware of a number of things we are still going to need at
some point.

 * Evolution

Sometimes different files in a dataset have different schemas.  A very
common case is fields getting added over time or fields getting
renamed or changing data type (e.g. int32 -> int64).  We have some
support for the former but none for the latter.  I've got a pretty
good idea of what the API looks like for "evolution" so if that is
something needed I could write that up.

 * Flight dataset/fragment/source-node?

I don't think there will be exactly a "FlightFragment".  Maybe the
right term is ADBC, but I haven't been following that discussion
closely enough.  There needs to be a way to scan a remote data source
that provides its data via a standard flight service.

 * Sql dataset/fragment/source-node?

It could be very useful to have a dataset that is capable of reading
data from SQL datasets via something like JDBC (although, if ADBC
connectors get built quickly enough, maybe this is never needed :)

 * Catalogs

The filesystem dataset currently figures out the dataset schema
through a rather expensive inspection process and it lists its
fragments using potentially expensive directory listing.  Metadata
catalogs (e.g. hive) and table formats (e.g. iceberg) often have ways
of storing precomputed versions of this information.  A dataset that
is capable of figuring out what files to scan from a catalog would be
valuable.  This dataset might use the same filesystem fragment to do
the actual scan.

 * Table metadata

Very similar to the catalogs discussion is the idea of "table
metadata".  This is less about reading data and more about describing
the data.  For example, metadata about any ordering of the incoming
data, unique constraints, not-null constraints, etc.  All of this
information can be used by exec nodes to simplify query processing.
For example, if you are grouping on a set of keys and one of the keys
is ordered then you implement group by with a streaming (not pipeline
breaking) implementation.

> that allows utilizing existing Python APIs that knows how to read data
> source as a stream of record batches.

We have a class called arrow::dataset::<unnamed>::OneShotFragment
which knows how to convert a python iterator into a scannable source.
This might serve your needs.  This was also written when things were
more dataset-oriented.  It might also be interesting to create a
python source node which does the same sort of thing, bypassing the
scanner, although I don't know that there would be much concrete
benefit.

I hope this information is helpful!  It is just background though.  I
think I might need to understand your needs in a bit more detail
before I can offer any kind of prescriptive advice.

On Fri, Jun 3, 2022 at 8:51 AM Li Jin <ice.xell...@gmail.com> wrote:
>
> Actually, "UDF" might be the wrong terminology here - This is more of a
> "custom Python data source" than "Python user defined functions". (Although
> under the hood it can probably reuse lots of the UDF logic to execute the
> custom data source)
>
> On Fri, Jun 3, 2022 at 2:49 PM Li Jin <ice.xell...@gmail.com> wrote:
>
> > What Yaron is going for is really something similar to custom data source
> > in Spark (
> > https://levelup.gitconnected.com/easy-guide-to-create-a-custom-read-data-source-in-apache-spark-3-194afdc9627a)
> > that allows utilizing existing Python APIs that knows how to read data
> > source as a stream of record batches.
> >
> >
> >

Reply via email to