I spent some time last week looking at the current data source v2 apis, and I thought we should be a bit more buttoned up in terms of the abstractions and the guarantees Spark provides. In particular, I feel we need the following levels of "abstractions", to fit the use cases in Spark, from batch, to streaming.
Please don't focus on the naming at this stage. When possible, I draw parallels to what similar levels are named in the currently committed api: 0. Format: This represents a specific format, e.g. Parquet, ORC. There is currently no explicit class at this level. 1. Table: This should represent a logical dataset (with schema). This could be just a directory on the file system, or a table in the catalog. Operations on tables can include batch reads (Scan), streams, writes, and potentially other operations such as deletes. The closest to the table level abstraction in the current code base is the "Provider" class, although Provider isn't quite a Table. This is similar to Ryan's proposed design. 2. Stream: Specific to streaming. A stream is created out of a Table. This logically represents a an instance of a StreamingQuery. Pushdowns and options are handled at this layer. I.e. Spark guarnatees to data source implementation pushdowns and options don't change within a Stream. Each Stream consists of a sequence of scans. There is no equivalent concept in the current committed code. 3. Scan: A physical scan -- either as part of a streaming query, or a batch query. This should contain sufficient information and methods so we can run a Spark job over a defined subset of the table. It's functionally equivalent to an RDD, except there's no dependency on RDD so it is a smaller surface. In the current code, the equivalent class would be the ScanConfig, which represents the information needed, but in order to execute a job, ReadSupport is needed (various methods in ReadSupport takes a ScanConfig). To illustrate with pseudocode what the different levels mean, a batch query would look like the following: val provider = reflection[Format]("parquet") val table = provider.createTable(options) val scan = table.createScan(scanConfig) // scanConfig includes pushdown and options // run tasks on executors A streaming micro-batch scan would look like the following: val provider = reflection[Format]("parquet") val table = provider.createTable(options) val stream = table.createStream(scanConfig) while(true) { val scan = streamingScan.createScan(startOffset) // run tasks on executors } Vs the current API, the above: 1. Creates an explicit Table abstraction, and an explicit Scan abstraction. 2. Have an explicit Stream level and makes it clear pushdowns and options are handled there, rather than at the individual scan (ReadSupport) level. Data source implementations don't need to worry about pushdowns or options changing mid-stream. For batch, those happen when the scan object is created. This email is just a high level sketch. I've asked Wenchen to prototype this, to see if it is actually feasible and the degree of hacks it removes, or creates.