BTW, do we hold Datasource V2 related PRs for now until we finish this refactoring just for clarification?
2018년 9월 7일 (금) 오전 12:52, Ryan Blue <rb...@netflix.com.invalid>님이 작성: > Wenchen, > > I'm not really sure what you're proposing here. What is a `LogicalWrite`? > Is it something that mirrors the read side in your PR? > > I think that I agree that if we have a Write independent of the Table that > carries the commit and abort methods, then we can create it directly > without a WriteConfig. So I tentatively agree with what you propose, > assuming that I understand it correctly. > > rb > > On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0...@gmail.com> wrote: > >> I'm switching to my another Gmail account, let's see if it still gets >> dropped this time. >> >> Hi Ryan, >> >> I'm thinking about the write path and feel the abstraction should be the >> same. >> >> We still have logical and physical writing. And the table can create >> different logical writing based on how to write. e.g., append, delete, >> replaceWhere, etc. >> >> One thing I'm not sure about is the WriteConfig. With the WriteConfig, >> the API would look like >> trait Table { >> WriteConfig newAppendWriteConfig(); >> >> WriteConfig newDeleteWriteConfig(deleteExprs); >> >> LogicalWrite newLogicalWrite(writeConfig); >> } >> >> Without WriteConfig, the API looks like >> trait Table { >> LogicalWrite newAppendWrite(); >> >> LogicalWrite newDeleteWrite(deleteExprs); >> } >> >> >> It looks to me that the API is simpler without WriteConfig, what do you >> think? >> >> Thanks, >> Wenchen >> >> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rb...@netflix.com.invalid> >> wrote: >> >>> Latest from Wenchen in case it was dropped. >>> >>> ---------- Forwarded message --------- >>> From: Wenchen Fan <wenc...@databricks.com> >>> Date: Mon, Sep 3, 2018 at 6:16 AM >>> Subject: Re: data source api v2 refactoring >>> To: <mri...@gmail.com> >>> Cc: Ryan Blue <rb...@netflix.com>, Reynold Xin <r...@databricks.com>, < >>> dev@spark.apache.org> >>> >>> >>> Hi Mridul, >>> >>> I'm not sure what's going on, my email was CC'ed to the dev list. >>> >>> >>> Hi Ryan, >>> >>> The logical and physical scan idea sounds good. To add more color >>> to Jungtaek's question, both micro-batch and continuous mode have >>> the logical and physical scan, but there is a difference: for micro-batch >>> mode, a physical scan outputs data for one epoch, but it's not true for >>> continuous mode. >>> >>> I'm not sure if it's necessary to include streaming epoch in the API >>> abstraction, for features like metrics reporting. >>> >>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mri...@gmail.com> >>> wrote: >>> >>>> >>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously >>>> Ryan did :-) ) >>>> I did not see it in the mail thread I received or in archives ... [1] >>>> Wondering which othersenderswere getting dropped (if yes). >>>> >>>> Regards >>>> Mridul >>>> >>>> [1] >>>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html >>>> >>>> >>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid> >>>> wrote: >>>> >>>>> Thanks for clarifying, Wenchen. I think that's what I expected. >>>>> >>>>> As for the abstraction, here's the way that I think about it: there >>>>> are two important parts of a scan: the definition of what will be read, >>>>> and >>>>> task sets that actually perform the read. In batch, there's one definition >>>>> of the scan and one task set so it makes sense that there's one scan >>>>> object >>>>> that encapsulates both of these concepts. For streaming, we need to >>>>> separate the two into the definition of what will be read (the stream or >>>>> streaming read) and the task sets that are run (scans). That way, the >>>>> streaming read behaves like a factory for scans, producing scans that >>>>> handle the data either in micro-batches or using continuous tasks. >>>>> >>>>> To address Jungtaek's question, I think that this does work with >>>>> continuous. In continuous mode, the query operators keep running and send >>>>> data to one another directly. The API still needs a streaming read layer >>>>> because it may still produce more than one continuous scan. That would >>>>> happen when the underlying source changes and Spark needs to reconfigure. >>>>> I >>>>> think the example here is when partitioning in a Kafka topic changes and >>>>> Spark needs to re-map Kafka partitions to continuous tasks. >>>>> >>>>> rb >>>>> >>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com> >>>>> wrote: >>>>> >>>>>> Hi Ryan, >>>>>> >>>>>> Sorry I may use a wrong wording. The pushdown is done with >>>>>> ScanConfig, which is not table/stream/scan, but something between them. >>>>>> The >>>>>> table creates ScanConfigBuilder, and table creates stream/scan with >>>>>> ScanConfig. For streaming source, stream is the one to take care of the >>>>>> pushdown result. For batch source, it's the scan. >>>>>> >>>>>> It's a little tricky because stream is an abstraction for streaming >>>>>> source only. Better ideas are welcome! >>>>>> >>>>> >>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote: >>>>>> >>>>>>> Thanks, Reynold! >>>>>>> >>>>>>> I think your API sketch looks great. I appreciate having the Table >>>>>>> level in the abstraction to plug into as well. I think this makes it >>>>>>> clear >>>>>>> what everything does, particularly having the Stream level that >>>>>>> represents >>>>>>> a configured (by ScanConfig) streaming read and can act as a factory for >>>>>>> individual batch scans or for continuous scans. >>>>>>> >>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table >>>>>>> level. It seems to mean that pushdown is specific to a batch scan or >>>>>>> streaming read, which seems to be what you're saying as well. Wouldn't >>>>>>> the >>>>>>> pushdown happen to create a ScanConfig, which is then used as Reynold >>>>>>> suggests? Looking forward to seeing this PR when you get it posted. >>>>>>> Thanks >>>>>>> for all of your work on this! >>>>>>> >>>>>>> rb >>>>>>> >>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com> >>>>>>> wrote: >>>>>>> >>>>>>>> Thank Reynold for writing this and starting the discussion! >>>>>>>> >>>>>>>> Data source v2 was started with batch only, so we didn't pay much >>>>>>>> attention to the abstraction and just follow the v1 API. Now we are >>>>>>>> designing the streaming API and catalog integration, the abstraction >>>>>>>> becomes super important. >>>>>>>> >>>>>>>> I like this proposed abstraction and have successfully prototyped >>>>>>>> it to make sure it works. >>>>>>>> >>>>>>>> During prototyping, I have to work around the issue that the >>>>>>>> current streaming engine does query optimization/planning for each >>>>>>>> micro >>>>>>>> batch. With this abstraction, the operator pushdown is only applied >>>>>>>> once >>>>>>>> per-query. In my prototype, I do the physical planning up front to get >>>>>>>> the >>>>>>>> pushdown result, and >>>>>>>> add a logical linking node that wraps the resulting physical plan >>>>>>>> node for the data source, and then swap that logical linking node into >>>>>>>> the >>>>>>>> logical plan for each batch. In the future we should just let the >>>>>>>> streaming >>>>>>>> engine do query optimization/planning only once. >>>>>>>> >>>>>>>> About pushdown, I think we should do it at the table level. The >>>>>>>> table should create a new pushdow handler to apply operator pushdowm >>>>>>>> for >>>>>>>> each scan/stream, and create the scan/stream with the pushdown result. >>>>>>>> The >>>>>>>> rationale is, a table should have the same pushdown behavior >>>>>>>> regardless the >>>>>>>> scan node. >>>>>>>> >>>>>>>> Thanks, >>>>>>>> Wenchen >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> >>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> >>>>>>>> wrote: >>>>>>>> >>>>>>>>> I spent some time last week looking at the current data source v2 >>>>>>>>> apis, and I thought we should be a bit more buttoned up in terms of >>>>>>>>> the >>>>>>>>> abstractions and the guarantees Spark provides. In particular, I feel >>>>>>>>> we >>>>>>>>> need the following levels of "abstractions", to fit the use cases in >>>>>>>>> Spark, >>>>>>>>> from batch, to streaming. >>>>>>>>> >>>>>>>>> Please don't focus on the naming at this stage. When possible, I >>>>>>>>> draw parallels to what similar levels are named in the currently >>>>>>>>> committed >>>>>>>>> api: >>>>>>>>> >>>>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. >>>>>>>>> There is currently no explicit class at this level. >>>>>>>>> >>>>>>>>> 1. Table: This should represent a logical dataset (with schema). >>>>>>>>> This could be just a directory on the file system, or a table in the >>>>>>>>> catalog. Operations on tables can include batch reads (Scan), streams, >>>>>>>>> writes, and potentially other operations such as deletes. The closest >>>>>>>>> to >>>>>>>>> the table level abstraction in the current code base is the "Provider" >>>>>>>>> class, although Provider isn't quite a Table. This is similar to >>>>>>>>> Ryan's >>>>>>>>> proposed design. >>>>>>>>> >>>>>>>>> 2. Stream: Specific to streaming. A stream is created out of a >>>>>>>>> Table. This logically represents a an instance of a StreamingQuery. >>>>>>>>> Pushdowns and options are handled at this layer. I.e. Spark >>>>>>>>> guarnatees to >>>>>>>>> data source implementation pushdowns and options don't change within a >>>>>>>>> Stream. Each Stream consists of a sequence of scans. There is no >>>>>>>>> equivalent concept in the current committed code. >>>>>>>>> >>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming query, >>>>>>>>> or a batch query. This should contain sufficient information and >>>>>>>>> methods so >>>>>>>>> we can run a Spark job over a defined subset of the table. It's >>>>>>>>> functionally equivalent to an RDD, except there's no dependency on >>>>>>>>> RDD so >>>>>>>>> it is a smaller surface. In the current code, the equivalent class >>>>>>>>> would be >>>>>>>>> the ScanConfig, which represents the information needed, but in order >>>>>>>>> to >>>>>>>>> execute a job, ReadSupport is needed (various methods in ReadSupport >>>>>>>>> takes >>>>>>>>> a ScanConfig). >>>>>>>>> >>>>>>>>> >>>>>>>>> To illustrate with pseudocode what the different levels mean, a >>>>>>>>> batch query would look like the following: >>>>>>>>> >>>>>>>>> val provider = reflection[Format]("parquet") >>>>>>>>> val table = provider.createTable(options) >>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes >>>>>>>>> pushdown and options >>>>>>>>> // run tasks on executors >>>>>>>>> >>>>>>>>> A streaming micro-batch scan would look like the following: >>>>>>>>> >>>>>>>>> val provider = reflection[Format]("parquet") >>>>>>>>> val table = provider.createTable(options) >>>>>>>>> val stream = table.createStream(scanConfig) >>>>>>>>> >>>>>>>>> while(true) { >>>>>>>>> val scan = streamingScan.createScan(startOffset) >>>>>>>>> // run tasks on executors >>>>>>>>> } >>>>>>>>> >>>>>>>>> >>>>>>>>> Vs the current API, the above: >>>>>>>>> >>>>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan >>>>>>>>> abstraction. >>>>>>>>> >>>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and >>>>>>>>> options are handled there, rather than at the individual scan >>>>>>>>> (ReadSupport) >>>>>>>>> level. Data source implementations don't need to worry about >>>>>>>>> pushdowns or >>>>>>>>> options changing mid-stream. For batch, those happen when the scan >>>>>>>>> object >>>>>>>>> is created. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>>>> This email is just a high level sketch. I've asked Wenchen to >>>>>>>>> prototype this, to see if it is actually feasible and the degree of >>>>>>>>> hacks >>>>>>>>> it removes, or creates. >>>>>>>>> >>>>>>>>> >>>>>>>>> >>>>>>> >>>>>>> -- >>>>>>> Ryan Blue >>>>>>> Software Engineer >>>>>>> Netflix >>>>>>> >>>>>> >>>>> >>>>> -- >>>>> Ryan Blue >>>>> Software Engineer >>>>> Netflix >>>>> >>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > -- > Ryan Blue > Software Engineer > Netflix >