I spent some time last week looking at the current data source v2 apis, and
I thought we should be a bit more buttoned up in terms of the abstractions
and the guarantees Spark provides. In particular, I feel we need the
following levels of "abstractions", to fit the use cases in Spark, from
batch, to streaming.

Please don't focus on the naming at this stage. When possible, I draw
parallels to what similar levels are named in the currently committed api:

0. Format: This represents a specific format, e.g. Parquet, ORC. There is
currently no explicit class at this level.

1. Table: This should represent a logical dataset (with schema). This could
be just a directory on the file system, or a table in the catalog.
Operations on tables can include batch reads (Scan), streams, writes, and
potentially other operations such as deletes. The closest to the table
level abstraction in the current code base is the "Provider" class,
although Provider isn't quite a Table. This is similar to Ryan's proposed
design.

2. Stream: Specific to streaming. A stream is created out of a Table. This
logically represents a an instance of a StreamingQuery. Pushdowns and
options are handled at this layer. I.e. Spark guarnatees to data source
implementation pushdowns and options don't change within a Stream. Each
Stream consists of a sequence of scans. There is no equivalent concept in
the current committed code.

3. Scan: A physical scan -- either as part of a streaming query, or a batch
query. This should contain sufficient information and methods so we can run
a Spark job over a defined subset of the table. It's functionally
equivalent to an RDD, except there's no dependency on RDD so it is a
smaller surface. In the current code, the equivalent class would be the
ScanConfig, which represents the information needed, but in order to
execute a job, ReadSupport is needed (various methods in ReadSupport takes
a ScanConfig).


To illustrate with pseudocode what the different levels mean, a batch query
would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val scan = table.createScan(scanConfig) // scanConfig includes pushdown and
options
// run tasks on executors

A streaming micro-batch scan would look like the following:

val provider = reflection[Format]("parquet")
val table = provider.createTable(options)
val stream = table.createStream(scanConfig)

while(true) {
  val scan = streamingScan.createScan(startOffset)
  // run tasks on executors
}


Vs the current API, the above:

1. Creates an explicit Table abstraction, and an explicit Scan abstraction.

2. Have an explicit Stream level and makes it clear pushdowns and options
are handled there, rather than at the individual scan (ReadSupport) level.
Data source implementations don't need to worry about pushdowns or options
changing mid-stream. For batch, those happen when the scan object is
created.



This email is just a high level sketch. I've asked Wenchen to prototype
this, to see if it is actually feasible and the degree of hacks it removes,
or creates.

Reply via email to