Is it only me or are all others getting Wenchen’s mails ? (Obviously Ryan did :-) ) I did not see it in the mail thread I received or in archives ... [1] Wondering which othersenderswere getting dropped (if yes).
Regards Mridul [1] http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid> wrote: > Thanks for clarifying, Wenchen. I think that's what I expected. > > As for the abstraction, here's the way that I think about it: there are > two important parts of a scan: the definition of what will be read, and > task sets that actually perform the read. In batch, there's one definition > of the scan and one task set so it makes sense that there's one scan object > that encapsulates both of these concepts. For streaming, we need to > separate the two into the definition of what will be read (the stream or > streaming read) and the task sets that are run (scans). That way, the > streaming read behaves like a factory for scans, producing scans that > handle the data either in micro-batches or using continuous tasks. > > To address Jungtaek's question, I think that this does work with > continuous. In continuous mode, the query operators keep running and send > data to one another directly. The API still needs a streaming read layer > because it may still produce more than one continuous scan. That would > happen when the underlying source changes and Spark needs to reconfigure. I > think the example here is when partitioning in a Kafka topic changes and > Spark needs to re-map Kafka partitions to continuous tasks. > > rb > > On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com> > wrote: > >> Hi Ryan, >> >> Sorry I may use a wrong wording. The pushdown is done with ScanConfig, >> which is not table/stream/scan, but something between them. The table >> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig. >> For streaming source, stream is the one to take care of the pushdown >> result. For batch source, it's the scan. >> >> It's a little tricky because stream is an abstraction for streaming >> source only. Better ideas are welcome! >> > >> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote: >> >>> Thanks, Reynold! >>> >>> I think your API sketch looks great. I appreciate having the Table level >>> in the abstraction to plug into as well. I think this makes it clear what >>> everything does, particularly having the Stream level that represents a >>> configured (by ScanConfig) streaming read and can act as a factory for >>> individual batch scans or for continuous scans. >>> >>> Wenchen, I'm not sure what you mean by doing pushdown at the table >>> level. It seems to mean that pushdown is specific to a batch scan or >>> streaming read, which seems to be what you're saying as well. Wouldn't the >>> pushdown happen to create a ScanConfig, which is then used as Reynold >>> suggests? Looking forward to seeing this PR when you get it posted. Thanks >>> for all of your work on this! >>> >>> rb >>> >>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com> >>> wrote: >>> >>>> Thank Reynold for writing this and starting the discussion! >>>> >>>> Data source v2 was started with batch only, so we didn't pay much >>>> attention to the abstraction and just follow the v1 API. Now we are >>>> designing the streaming API and catalog integration, the abstraction >>>> becomes super important. >>>> >>>> I like this proposed abstraction and have successfully prototyped it to >>>> make sure it works. >>>> >>>> During prototyping, I have to work around the issue that the current >>>> streaming engine does query optimization/planning for each micro batch. >>>> With this abstraction, the operator pushdown is only applied once >>>> per-query. In my prototype, I do the physical planning up front to get the >>>> pushdown result, and >>>> add a logical linking node that wraps the resulting physical plan node >>>> for the data source, and then swap that logical linking node into the >>>> logical plan for each batch. In the future we should just let the streaming >>>> engine do query optimization/planning only once. >>>> >>>> About pushdown, I think we should do it at the table level. The table >>>> should create a new pushdow handler to apply operator pushdowm for each >>>> scan/stream, and create the scan/stream with the pushdown result. The >>>> rationale is, a table should have the same pushdown behavior regardless the >>>> scan node. >>>> >>>> Thanks, >>>> Wenchen >>>> >>>> >>>> >>>> >>>> >>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com> >>>> wrote: >>>> >>>>> I spent some time last week looking at the current data source v2 >>>>> apis, and I thought we should be a bit more buttoned up in terms of the >>>>> abstractions and the guarantees Spark provides. In particular, I feel we >>>>> need the following levels of "abstractions", to fit the use cases in >>>>> Spark, >>>>> from batch, to streaming. >>>>> >>>>> Please don't focus on the naming at this stage. When possible, I draw >>>>> parallels to what similar levels are named in the currently committed api: >>>>> >>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC. There >>>>> is currently no explicit class at this level. >>>>> >>>>> 1. Table: This should represent a logical dataset (with schema). This >>>>> could be just a directory on the file system, or a table in the catalog. >>>>> Operations on tables can include batch reads (Scan), streams, writes, and >>>>> potentially other operations such as deletes. The closest to the table >>>>> level abstraction in the current code base is the "Provider" class, >>>>> although Provider isn't quite a Table. This is similar to Ryan's proposed >>>>> design. >>>>> >>>>> 2. Stream: Specific to streaming. A stream is created out of a Table. >>>>> This logically represents a an instance of a StreamingQuery. Pushdowns and >>>>> options are handled at this layer. I.e. Spark guarnatees to data source >>>>> implementation pushdowns and options don't change within a Stream. Each >>>>> Stream consists of a sequence of scans. There is no equivalent >>>>> concept in the current committed code. >>>>> >>>>> 3. Scan: A physical scan -- either as part of a streaming query, or a >>>>> batch query. This should contain sufficient information and methods so we >>>>> can run a Spark job over a defined subset of the table. It's functionally >>>>> equivalent to an RDD, except there's no dependency on RDD so it is a >>>>> smaller surface. In the current code, the equivalent class would be the >>>>> ScanConfig, which represents the information needed, but in order to >>>>> execute a job, ReadSupport is needed (various methods in ReadSupport takes >>>>> a ScanConfig). >>>>> >>>>> >>>>> To illustrate with pseudocode what the different levels mean, a batch >>>>> query would look like the following: >>>>> >>>>> val provider = reflection[Format]("parquet") >>>>> val table = provider.createTable(options) >>>>> val scan = table.createScan(scanConfig) // scanConfig includes >>>>> pushdown and options >>>>> // run tasks on executors >>>>> >>>>> A streaming micro-batch scan would look like the following: >>>>> >>>>> val provider = reflection[Format]("parquet") >>>>> val table = provider.createTable(options) >>>>> val stream = table.createStream(scanConfig) >>>>> >>>>> while(true) { >>>>> val scan = streamingScan.createScan(startOffset) >>>>> // run tasks on executors >>>>> } >>>>> >>>>> >>>>> Vs the current API, the above: >>>>> >>>>> 1. Creates an explicit Table abstraction, and an explicit Scan >>>>> abstraction. >>>>> >>>>> 2. Have an explicit Stream level and makes it clear pushdowns and >>>>> options are handled there, rather than at the individual scan >>>>> (ReadSupport) >>>>> level. Data source implementations don't need to worry about pushdowns or >>>>> options changing mid-stream. For batch, those happen when the scan object >>>>> is created. >>>>> >>>>> >>>>> >>>>> This email is just a high level sketch. I've asked Wenchen to >>>>> prototype this, to see if it is actually feasible and the degree of hacks >>>>> it removes, or creates. >>>>> >>>>> >>>>> >>> >>> -- >>> Ryan Blue >>> Software Engineer >>> Netflix >>> >> > > -- > Ryan Blue > Software Engineer > Netflix >