I want to bring back the discussion of data source v2 abstraction. There is a problem discovered by Hyukjin recently. For a write-only data source, it may accept any input, and itself does not have a schema. Then the table abstraction doesn't fit it, as table must provide a schema.
Personally I think this is a corner case. Who will develop a data source like that? If users do have this requirement, maybe they can just implement a table with empty schema, and in Spark the append operator skips input schema validation if table schema is empty. Any thoughts? Thanks, Wenchen On Thu, Sep 20, 2018 at 4:51 AM Thakrar, Jayesh < jthak...@conversantmedia.com> wrote: > Thanks for the info Ryan – very helpful! > > > > *From: *Ryan Blue <rb...@netflix.com> > *Reply-To: *"rb...@netflix.com" <rb...@netflix.com> > *Date: *Wednesday, September 19, 2018 at 3:17 PM > *To: *"Thakrar, Jayesh" <jthak...@conversantmedia.com> > *Cc: *Wenchen Fan <cloud0...@gmail.com>, Hyukjin Kwon <gurwls...@gmail.com>, > Spark Dev List <dev@spark.apache.org> > *Subject: *Re: data source api v2 refactoring > > > > Hi Jayesh, > > > > The existing sources haven't been ported to v2 yet. That is going to be > tricky because the existing sources implement behaviors that we need to > keep for now. > > > > I wrote up an SPIP to standardize logical plans while moving to the v2 > sources. The reason why we need this is that too much is delegated to > sources today. For example, sources are handed a SaveMode to overwrite > data, but what exactly gets overwritten isn't defined and it varies by the > source that gets used. That's not a good thing and we want to clean up what > happens so that users know that a query behaves the same way across all v2 > sources. CTAS shouldn't succeed for one source but fail for another if the > table already exists. > > > > Standardizing plans makes it difficult to port the existing sources to v2 > because we need to implement the behavior of the v2 plans, which may not be > the existing v1 behavior. I think what we should do is keep the existing v1 > sources working as they do today, and add a way to opt in for v2 behavior. > One good way to do this is to use a new write API that is more clear; I > proposed one in the SPIP I mentioned earlier. SQL is a bit easier because > the behavior for SQL is fairly well-defined. The problem is mostly with the > existing DF write API, DataFrameWriter. > > > > It would be great to open a discussion about the compatibility between v1 > and v2 and come up with a plan on this list. > > > > rb > > > > On Fri, Sep 7, 2018 at 2:12 PM Thakrar, Jayesh < > jthak...@conversantmedia.com> wrote: > > Ryan et al, > > > > Wondering if the existing Spark based data sources (e.g. for HDFS, Kafka) > have been ported to V2. > > I remember reading threads where there were discussions about the > inefficiency/overhead of converting from Row to InternalRow that was > preventing certain porting effort etc. > > > > I ask because those are the most widely used data sources and have a lot > of effort and thinking behind them, and if they have ported over to V2, > then they can serve as excellent production examples of V2 API. > > > > Thanks, > > Jayesh > > > > *From: *Ryan Blue <rb...@netflix.com.INVALID> > *Reply-To: *<rb...@netflix.com> > *Date: *Friday, September 7, 2018 at 2:19 PM > *To: *Wenchen Fan <cloud0...@gmail.com> > *Cc: *Hyukjin Kwon <gurwls...@gmail.com>, Spark Dev List < > dev@spark.apache.org> > *Subject: *Re: data source api v2 refactoring > > > > There are a few v2-related changes that we can work in parallel, at least > for reviews: > > > > * SPARK-25006, #21978 <https://github.com/apache/spark/pull/21978>: Add > catalog to TableIdentifier - this proposes how to incrementally add > multi-catalog support without breaking existing code paths > > * SPARK-24253, #21308 <https://github.com/apache/spark/pull/21308>: Add > DeleteSupport API - this is a small API addition, which doesn't affect the > refactor > > * SPARK-24252, #21306 <https://github.com/apache/spark/pull/21306>: Add > v2 Catalog API - this is a different way to create v2 tables, also doesn't > affect the refactor > > > > I agree that the PR for adding SQL support should probably wait on the > refactor. I have also been meaning to share our implementation, which isn't > based on the refactor. It handles CTAS, RTAS, InsertInto, DeleteFrom, and > AlterTable from both SQL and the other methods in the DF API, saveAsTable > and insertInto. It follows the structure that I proposed on the SQL support > PR to convert SQL plans to v2 plans and uses the new TableCatalog to > implement CTAS and RTAS. > > > > rb > > > > > > On Fri, Sep 7, 2018 at 12:27 AM Wenchen Fan <cloud0...@gmail.com> wrote: > > Hi Ryan, > > > > You are right that the `LogicalWrite` mirrors the read side API. I just > don't have a good naming yet, and write side changes will be a different PR. > > > > > > Hi Hyukjin, > > > > That's my expectation, otherwise we keep rebasing the refactor PR and > never get it done. > > > > On Fri, Sep 7, 2018 at 3:02 PM Hyukjin Kwon <gurwls...@gmail.com> wrote: > > BTW, do we hold Datasource V2 related PRs for now until we finish this > refactoring just for clarification? > > > > 2018년 9월 7일 (금) 오전 12:52, Ryan Blue <rb...@netflix.com.invalid>님이 작성: > > Wenchen, > > > > I'm not really sure what you're proposing here. What is a `LogicalWrite`? > Is it something that mirrors the read side in your PR? > > > > I think that I agree that if we have a Write independent of the Table that > carries the commit and abort methods, then we can create it directly > without a WriteConfig. So I tentatively agree with what you propose, > assuming that I understand it correctly. > > > > rb > > > > On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0...@gmail.com> wrote: > > I'm switching to my another Gmail account, let's see if it still gets > dropped this time. > > > > Hi Ryan, > > > > I'm thinking about the write path and feel the abstraction should be the > same. > > > > We still have logical and physical writing. And the table can create > different logical writing based on how to write. e.g., append, delete, > replaceWhere, etc. > > > > One thing I'm not sure about is the WriteConfig. With the WriteConfig, the > API would look like > > trait Table { > > WriteConfig newAppendWriteConfig(); > > > > WriteConfig newDeleteWriteConfig(deleteExprs); > > > > LogicalWrite newLogicalWrite(writeConfig); > > } > > > > Without WriteConfig, the API looks like > > trait Table { > > LogicalWrite newAppendWrite(); > > > > LogicalWrite newDeleteWrite(deleteExprs); > > } > > > > > > It looks to me that the API is simpler without WriteConfig, what do you > think? > > > > Thanks, > > Wenchen > > > > On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rb...@netflix.com.invalid> > wrote: > > Latest from Wenchen in case it was dropped. > > ---------- Forwarded message --------- > From: *Wenchen Fan* <wenc...@databricks.com> > Date: Mon, Sep 3, 2018 at 6:16 AM > Subject: Re: data source api v2 refactoring > To: <mri...@gmail.com> > Cc: Ryan Blue <rb...@netflix.com>, Reynold Xin <r...@databricks.com>, < > dev@spark.apache.org> > > > > Hi Mridul, > > > > I'm not sure what's going on, my email was CC'ed to the dev list. > > > > > > Hi Ryan, > > > > The logical and physical scan idea sounds good. To add more color > to Jungtaek's question, both micro-batch and continuous mode have > the logical and physical scan, but there is a difference: for micro-batch > mode, a physical scan outputs data for one epoch, but it's not true for > continuous mode. > > > > I'm not sure if it's necessary to include streaming epoch in the API > abstraction, for features like metrics reporting. > > > > On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mri...@gmail.com> > wrote: > > > > Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan > did :-) ) > > I did not see it in the mail thread I received or in archives ... [1] > Wondering which othersenderswere getting dropped (if yes). > > > > Regards > > Mridul > > > > [1] > http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html > > > > > > On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid> > wrote: > > Thanks for clarifying, Wenchen. I think that's what I expected. > > > > As for the abstraction, here's the way that I think about it: there are > two important parts of a scan: the definition of what will be read, and > task sets that actually perform the read. In batch, there's one definition > of the scan and one task set so it makes sense that there's one scan object > that encapsulates both of these concepts. For streaming, we need to > separate the two into the definition of what will be read (the stream or > streaming read) and the task sets that are run (scans). That way, the > streaming read behaves like a factory for scans, producing scans that > handle the data either in micro-batches or using continuous tasks. > > > > To address Jungtaek's question, I think that this does work with > continuous. In continuous mode, the query operators keep running and send > data to one another directly. The API still needs a streaming read layer > because it may still produce more than one continuous scan. That would > happen when the underlying source changes and Spark needs to reconfigure. I > think the example here is when partitioning in a Kafka topic changes and > Spark needs to re-map Kafka partitions to continuous tasks. > > > > rb > > > > On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com> > wrote: > > Hi Ryan, > > > > Sorry I may use a wrong wording. The pushdown is done with ScanConfig, > which is not table/stream/scan, but something between them. The table > creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. > For streaming source, stream is the one to take care of the pushdown > result. For batch source, it's the scan. > > > > It's a little tricky because stream is an abstraction for streaming source > only. Better ideas are welcome! > > > > On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote: > > Thanks, Reynold! > > > > I think your API sketch looks great. I appreciate having the Table level > in the abstraction to plug into as well. I think this makes it clear what > everything does, particularly having the Stream level that represents a > configured (by ScanConfig) streaming read and can act as a factory for > individual batch scans or for continuous scans. > > > > Wenchen, I'm not sure what you mean by doing pushdown at the table level. > It seems to mean that pushdown is specific to a batch scan or streaming > read, which seems to be what you're saying as well. Wouldn't the pushdown > happen to create a ScanConfig, which is then used as Reynold suggests? > Looking forward to seeing this PR when you get it posted. Thanks for all of > your work on this! > > > > rb > > > > On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com> > wrote: > > Thank Reynold for writing this and starting the discussion! > > > > Data source v2 was started with batch only, so we didn't pay much > attention to the abstraction and just follow the v1 API. Now we are > designing the streaming API and catalog integration, the abstraction > becomes super important. > > > > I like this proposed abstraction and have successfully prototyped it to > make sure it works. > > > > During prototyping, I have to work around the issue that the current > streaming engine does query optimization/planning for each micro batch. > With this abstraction, the operator pushdown is only applied once > per-query. In my prototype, I do the physical planning up front to get the > pushdown result, and > > add a logical linking node that wraps the resulting physical plan node for > the data source, and then swap that logical linking node into the logical > plan for each batch. In the future we should just let the streaming engine > do query optimization/planning only once. > > > > About pushdown, I think we should do it at the table level. The table > should create a new pushdow handler to apply operator pushdowm for each > scan/stream, and create the scan/stream with the pushdown result. The > rationale is, a table should have the same pushdown behavior regardless the > scan node. > > > > Thanks, > > Wenchen > > > > > > > > > > > > On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> wrote: > > I spent some time last week looking at the current data source v2 apis, > and I thought we should be a bit more buttoned up in terms of the > abstractions and the guarantees Spark provides. In particular, I feel we > need the following levels of "abstractions", to fit the use cases in Spark, > from batch, to streaming. > > > > Please don't focus on the naming at this stage. When possible, I draw > parallels to what similar levels are named in the currently committed api: > > > > 0. Format: This represents a specific format, e.g. Parquet, ORC. There is > currently no explicit class at this level. > > > > 1. Table: This should represent a logical dataset (with schema). This > could be just a directory on the file system, or a table in the catalog. > Operations on tables can include batch reads (Scan), streams, writes, and > potentially other operations such as deletes. The closest to the table > level abstraction in the current code base is the "Provider" class, > although Provider isn't quite a Table. This is similar to Ryan's proposed > design. > > > > 2. Stream: Specific to streaming. A stream is created out of a Table. This > logically represents a an instance of a StreamingQuery. Pushdowns and > options are handled at this layer. I.e. Spark guarnatees to data source > implementation pushdowns and options don't change within a Stream. Each > Stream consists of a sequence of scans. There is no equivalent concept in > the current committed code. > > > > 3. Scan: A physical scan -- either as part of a streaming query, or a > batch query. This should contain sufficient information and methods so we > can run a Spark job over a defined subset of the table. It's functionally > equivalent to an RDD, except there's no dependency on RDD so it is a > smaller surface. In the current code, the equivalent class would be the > ScanConfig, which represents the information needed, but in order to > execute a job, ReadSupport is needed (various methods in ReadSupport takes > a ScanConfig). > > > > > > To illustrate with pseudocode what the different levels mean, a batch > query would look like the following: > > > > val provider = reflection[Format]("parquet") > > val table = provider.createTable(options) > > val scan = table.createScan(scanConfig) // scanConfig includes pushdown > and options > // run tasks on executors > > > > A streaming micro-batch scan would look like the following: > > > > val provider = reflection[Format]("parquet") > > val table = provider.createTable(options) > > val stream = table.createStream(scanConfig) > > > > while(true) { > > val scan = streamingScan.createScan(startOffset) > > // run tasks on executors > > } > > > > > > Vs the current API, the above: > > > > 1. Creates an explicit Table abstraction, and an explicit Scan abstraction. > > > > 2. Have an explicit Stream level and makes it clear pushdowns and options > are handled there, rather than at the individual scan (ReadSupport) level. > Data source implementations don't need to worry about pushdowns or options > changing mid-stream. For batch, those happen when the scan object is > created. > > > > > > > > This email is just a high level sketch. I've asked Wenchen to prototype > this, to see if it is actually feasible and the degree of hacks it removes, > or creates. > > > > > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix > > > > > -- > > Ryan Blue > > Software Engineer > > Netflix >