Wenchen, I'm not really sure what you're proposing here. What is a `LogicalWrite`? Is it something that mirrors the read side in your PR?
I think that I agree that if we have a Write independent of the Table that carries the commit and abort methods, then we can create it directly without a WriteConfig. So I tentatively agree with what you propose, assuming that I understand it correctly. rb On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0...@gmail.com> wrote: > I'm switching to my another Gmail account, let's see if it still gets > dropped this time. > > Hi Ryan, > > I'm thinking about the write path and feel the abstraction should be the > same. > > We still have logical and physical writing. And the table can create > different logical writing based on how to write. e.g., append, delete, > replaceWhere, etc. > > One thing I'm not sure about is the WriteConfig. With the WriteConfig, the > API would look like > trait Table { > WriteConfig newAppendWriteConfig(); > > WriteConfig newDeleteWriteConfig(deleteExprs); > > LogicalWrite newLogicalWrite(writeConfig); > } > > Without WriteConfig, the API looks like > trait Table { > LogicalWrite newAppendWrite(); > > LogicalWrite newDeleteWrite(deleteExprs); > } > > > It looks to me that the API is simpler without WriteConfig, what do you > think? > > Thanks, > Wenchen > > On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rb...@netflix.com.invalid> > wrote: > >> Latest from Wenchen in case it was dropped. >> >> ---------- Forwarded message --------- >> From: Wenchen Fan <wenc...@databricks.com> >> Date: Mon, Sep 3, 2018 at 6:16 AM >> Subject: Re: data source api v2 refactoring >> To: <mri...@gmail.com> >> Cc: Ryan Blue <rb...@netflix.com>, Reynold Xin <r...@databricks.com>, < >> dev@spark.apache.org> >> >> >> Hi Mridul, >> >> I'm not sure what's going on, my email was CC'ed to the dev list. >> >> >> Hi Ryan, >> >> The logical and physical scan idea sounds good. To add more color >> to Jungtaek's question, both micro-batch and continuous mode have >> the logical and physical scan, but there is a difference: for micro-batch >> mode, a physical scan outputs data for one epoch, but it's not true for >> continuous mode. >> >> I'm not sure if it's necessary to include streaming epoch in the API >> abstraction, for features like metrics reporting. >> >> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mri...@gmail.com> >> wrote: >> >>> >>> Is it only me or are all others getting Wenchen’s mails ? (Obviously >>> Ryan did :-) ) >>> I did not see it in the mail thread I received or in archives ... [1] >>> Wondering which othersenderswere getting dropped (if yes). >>> >>> Regards >>> Mridul >>> >>> [1] >>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html >>> >>> >>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid> >>> wrote: >>> >>>> Thanks for clarifying, Wenchen. I think that's what I expected. >>>> >>>> As for the abstraction, here's the way that I think about it: there are >>>> two important parts of a scan: the definition of what will be read, and >>>> task sets that actually perform the read. In batch, there's one definition >>>> of the scan and one task set so it makes sense that there's one scan object >>>> that encapsulates both of these concepts. For streaming, we need to >>>> separate the two into the definition of what will be read (the stream or >>>> streaming read) and the task sets that are run (scans). That way, the >>>> streaming read behaves like a factory for scans, producing scans that >>>> handle the data either in micro-batches or using continuous tasks. >>>> >>>> To address Jungtaek's question, I think that this does work with >>>> continuous. In continuous mode, the query operators keep running and send >>>> data to one another directly. The API still needs a streaming read layer >>>> because it may still produce more than one continuous scan. That would >>>> happen when the underlying source changes and Spark needs to reconfigure. I >>>> think the example here is when partitioning in a Kafka topic changes and >>>> Spark needs to re-map Kafka partitions to continuous tasks. >>>> >>>> rb >>>> >>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com> >>>> wrote: >>>> >>>>> Hi Ryan, >>>>> >>>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig, >>>>> which is not table/stream/scan, but something between them. The table >>>>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. >>>>> For streaming source, stream is the one to take care of the pushdown >>>>> result. For batch source, it's the scan. >>>>> >>>>> It's a little tricky because stream is an abstraction for streaming >>>>> source only. Better ideas are welcome! >>>>> >>>> >>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote: >>>>> >>>>>> Thanks, Reynold! >>>>>> >>>>>> I think your API sketch looks great. I appreciate having the Table >>>>>> level in the abstraction to plug into as well. I think this makes it >>>>>> clear >>>>>> what everything does, particularly having the Stream level that >>>>>> represents >>>>>> a configured (by ScanConfig) streaming read and can act as a factory for >>>>>> individual batch scans or for continuous scans. >>>>>> >>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table >>>>>> level. It seems to mean that pushdown is specific to a batch scan or >>>>>> streaming read, which seems to be what you're saying as well. Wouldn't >>>>>> the >>>>>> pushdown happen to create a ScanConfig, which is then used as Reynold >>>>>> suggests? Looking forward to seeing this PR when you get it posted. >>>>>> Thanks >>>>>> for all of your work on this! >>>>>> >>>>>> rb >>>>>> >>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com> >>>>>> wrote: >>>>>> >>>>>>> Thank Reynold for writing this and starting the discussion! >>>>>>> >>>>>>> Data source v2 was started with batch only, so we didn't pay much >>>>>>> attention to the abstraction and just follow the v1 API. Now we are >>>>>>> designing the streaming API and catalog integration, the abstraction >>>>>>> becomes super important. >>>>>>> >>>>>>> I like this proposed abstraction and have successfully prototyped it >>>>>>> to make sure it works. >>>>>>> >>>>>>> During prototyping, I have to work around the issue that the current >>>>>>> streaming engine does query optimization/planning for each micro batch. >>>>>>> With this abstraction, the operator pushdown is only applied once >>>>>>> per-query. In my prototype, I do the physical planning up front to get >>>>>>> the >>>>>>> pushdown result, and >>>>>>> add a logical linking node that wraps the resulting physical plan >>>>>>> node for the data source, and then swap that logical linking node into >>>>>>> the >>>>>>> logical plan for each batch. In the future we should just let the >>>>>>> streaming >>>>>>> engine do query optimization/planning only once. >>>>>>> >>>>>>> About pushdown, I think we should do it at the table level. The >>>>>>> table should create a new pushdow handler to apply operator pushdowm for >>>>>>> each scan/stream, and create the scan/stream with the pushdown result. >>>>>>> The >>>>>>> rationale is, a table should have the same pushdown behavior regardless >>>>>>> the >>>>>>> scan node. >>>>>>> >>>>>>> Thanks, >>>>>>> Wenchen >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> >>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> >>>>>>> wrote: >>>>>>> >>>>>>>> I spent some time last week looking at the current data source v2 >>>>>>>> apis, and I thought we should be a bit more buttoned up in terms of the >>>>>>>> abstractions and the guarantees Spark provides. In particular, I feel >>>>>>>> we >>>>>>>> need the following levels of "abstractions", to fit the use cases in >>>>>>>> Spark, >>>>>>>> from batch, to streaming. >>>>>>>> >>>>>>>> Please don't focus on the naming at this stage. When possible, I >>>>>>>> draw parallels to what similar levels are named in the currently >>>>>>>> committed >>>>>>>> api: >>>>>>>> >>>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. >>>>>>>> There is currently no explicit class at this level. >>>>>>>> >>>>>>>> 1. Table: This should represent a logical dataset (with schema). >>>>>>>> This could be just a directory on the file system, or a table in the >>>>>>>> catalog. Operations on tables can include batch reads (Scan), streams, >>>>>>>> writes, and potentially other operations such as deletes. The closest >>>>>>>> to >>>>>>>> the table level abstraction in the current code base is the "Provider" >>>>>>>> class, although Provider isn't quite a Table. This is similar to Ryan's >>>>>>>> proposed design. >>>>>>>> >>>>>>>> 2. Stream: Specific to streaming. A stream is created out of a >>>>>>>> Table. This logically represents a an instance of a StreamingQuery. >>>>>>>> Pushdowns and options are handled at this layer. I.e. Spark guarnatees >>>>>>>> to >>>>>>>> data source implementation pushdowns and options don't change within a >>>>>>>> Stream. Each Stream consists of a sequence of scans. There is no >>>>>>>> equivalent concept in the current committed code. >>>>>>>> >>>>>>>> 3. Scan: A physical scan -- either as part of a streaming query, or >>>>>>>> a batch query. This should contain sufficient information and methods >>>>>>>> so we >>>>>>>> can run a Spark job over a defined subset of the table. It's >>>>>>>> functionally >>>>>>>> equivalent to an RDD, except there's no dependency on RDD so it is a >>>>>>>> smaller surface. In the current code, the equivalent class would be the >>>>>>>> ScanConfig, which represents the information needed, but in order to >>>>>>>> execute a job, ReadSupport is needed (various methods in ReadSupport >>>>>>>> takes >>>>>>>> a ScanConfig). >>>>>>>> >>>>>>>> >>>>>>>> To illustrate with pseudocode what the different levels mean, a >>>>>>>> batch query would look like the following: >>>>>>>> >>>>>>>> val provider = reflection[Format]("parquet") >>>>>>>> val table = provider.createTable(options) >>>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes >>>>>>>> pushdown and options >>>>>>>> // run tasks on executors >>>>>>>> >>>>>>>> A streaming micro-batch scan would look like the following: >>>>>>>> >>>>>>>> val provider = reflection[Format]("parquet") >>>>>>>> val table = provider.createTable(options) >>>>>>>> val stream = table.createStream(scanConfig) >>>>>>>> >>>>>>>> while(true) { >>>>>>>> val scan = streamingScan.createScan(startOffset) >>>>>>>> // run tasks on executors >>>>>>>> } >>>>>>>> >>>>>>>> >>>>>>>> Vs the current API, the above: >>>>>>>> >>>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan >>>>>>>> abstraction. >>>>>>>> >>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and >>>>>>>> options are handled there, rather than at the individual scan >>>>>>>> (ReadSupport) >>>>>>>> level. Data source implementations don't need to worry about pushdowns >>>>>>>> or >>>>>>>> options changing mid-stream. For batch, those happen when the scan >>>>>>>> object >>>>>>>> is created. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> This email is just a high level sketch. I've asked Wenchen to >>>>>>>> prototype this, to see if it is actually feasible and the degree of >>>>>>>> hacks >>>>>>>> it removes, or creates. >>>>>>>> >>>>>>>> >>>>>>>> >>>>>> >>>>>> -- >>>>>> Ryan Blue >>>>>> Software Engineer >>>>>> Netflix >>>>>> >>>>> >>>> >>>> -- >>>> Ryan Blue >>>> Software Engineer >>>> Netflix >>>> >>> >> >> -- >> Ryan Blue >> Software Engineer >> Netflix >> > -- Ryan Blue Software Engineer Netflix