BTW, do we hold Datasource V2 related PRs for now until we finish this
refactoring just for clarification?

2018년 9월 7일 (금) 오전 12:52, Ryan Blue <rb...@netflix.com.invalid>님이 작성:

> Wenchen,
>
> I'm not really sure what you're proposing here. What is a `LogicalWrite`?
> Is it something that mirrors the read side in your PR?
>
> I think that I agree that if we have a Write independent of the Table that
> carries the commit and abort methods, then we can create it directly
> without a WriteConfig. So I tentatively agree with what you propose,
> assuming that I understand it correctly.
>
> rb
>
> On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0...@gmail.com> wrote:
>
>> I'm switching to my another Gmail account, let's see if it still gets
>> dropped this time.
>>
>> Hi Ryan,
>>
>> I'm thinking about the write path and feel the abstraction should be the
>> same.
>>
>> We still have logical and physical writing. And the table can create
>> different logical writing based on how to write. e.g., append, delete,
>> replaceWhere, etc.
>>
>> One thing I'm not sure about is the WriteConfig. With the WriteConfig,
>> the API would look like
>> trait Table {
>>   WriteConfig newAppendWriteConfig();
>>
>>   WriteConfig newDeleteWriteConfig(deleteExprs);
>>
>>   LogicalWrite newLogicalWrite(writeConfig);
>> }
>>
>> Without WriteConfig, the API looks like
>> trait Table {
>>   LogicalWrite newAppendWrite();
>>
>>   LogicalWrite newDeleteWrite(deleteExprs);
>> }
>>
>>
>> It looks to me that the API is simpler without WriteConfig, what do you
>> think?
>>
>> Thanks,
>> Wenchen
>>
>> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rb...@netflix.com.invalid>
>> wrote:
>>
>>> Latest from Wenchen in case it was dropped.
>>>
>>> ---------- Forwarded message ---------
>>> From: Wenchen Fan <wenc...@databricks.com>
>>> Date: Mon, Sep 3, 2018 at 6:16 AM
>>> Subject: Re: data source api v2 refactoring
>>> To: <mri...@gmail.com>
>>> Cc: Ryan Blue <rb...@netflix.com>, Reynold Xin <r...@databricks.com>, <
>>> dev@spark.apache.org>
>>>
>>>
>>> Hi Mridul,
>>>
>>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>>
>>>
>>> Hi Ryan,
>>>
>>> The logical and physical scan idea sounds good. To add more color
>>> to Jungtaek's question, both micro-batch and continuous mode have
>>> the logical and physical scan, but there is a difference: for micro-batch
>>> mode, a physical scan outputs data for one epoch, but it's not true for
>>> continuous mode.
>>>
>>> I'm not sure if it's necessary to include streaming epoch in the API
>>> abstraction, for features like metrics reporting.
>>>
>>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mri...@gmail.com>
>>> wrote:
>>>
>>>>
>>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>>> Ryan did :-) )
>>>> I did not see it in the mail thread I received or in archives ... [1]
>>>> Wondering which othersenderswere getting dropped (if yes).
>>>>
>>>> Regards
>>>> Mridul
>>>>
>>>> [1]
>>>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>>>>
>>>>
>>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid>
>>>> wrote:
>>>>
>>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>>>
>>>>> As for the abstraction, here's the way that I think about it: there
>>>>> are two important parts of a scan: the definition of what will be read, 
>>>>> and
>>>>> task sets that actually perform the read. In batch, there's one definition
>>>>> of the scan and one task set so it makes sense that there's one scan 
>>>>> object
>>>>> that encapsulates both of these concepts. For streaming, we need to
>>>>> separate the two into the definition of what will be read (the stream or
>>>>> streaming read) and the task sets that are run (scans). That way, the
>>>>> streaming read behaves like a factory for scans, producing scans that
>>>>> handle the data either in micro-batches or using continuous tasks.
>>>>>
>>>>> To address Jungtaek's question, I think that this does work with
>>>>> continuous. In continuous mode, the query operators keep running and send
>>>>> data to one another directly. The API still needs a streaming read layer
>>>>> because it may still produce more than one continuous scan. That would
>>>>> happen when the underlying source changes and Spark needs to reconfigure. 
>>>>> I
>>>>> think the example here is when partitioning in a Kafka topic changes and
>>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>>>
>>>>> rb
>>>>>
>>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> Hi Ryan,
>>>>>>
>>>>>> Sorry I may use a wrong wording. The pushdown is done with
>>>>>> ScanConfig, which is not table/stream/scan, but something between them. 
>>>>>> The
>>>>>> table creates ScanConfigBuilder, and table creates stream/scan with
>>>>>> ScanConfig. For streaming source, stream is the one to take care of the
>>>>>> pushdown result. For batch source, it's the scan.
>>>>>>
>>>>>> It's a little tricky because stream is an abstraction for streaming
>>>>>> source only. Better ideas are welcome!
>>>>>>
>>>>>
>>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>>
>>>>>>> Thanks, Reynold!
>>>>>>>
>>>>>>> I think your API sketch looks great. I appreciate having the Table
>>>>>>> level in the abstraction to plug into as well. I think this makes it 
>>>>>>> clear
>>>>>>> what everything does, particularly having the Stream level that 
>>>>>>> represents
>>>>>>> a configured (by ScanConfig) streaming read and can act as a factory for
>>>>>>> individual batch scans or for continuous scans.
>>>>>>>
>>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table
>>>>>>> level. It seems to mean that pushdown is specific to a batch scan or
>>>>>>> streaming read, which seems to be what you're saying as well. Wouldn't 
>>>>>>> the
>>>>>>> pushdown happen to create a ScanConfig, which is then used as Reynold
>>>>>>> suggests? Looking forward to seeing this PR when you get it posted. 
>>>>>>> Thanks
>>>>>>> for all of your work on this!
>>>>>>>
>>>>>>> rb
>>>>>>>
>>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>>>>
>>>>>>>> Data source v2 was started with batch only, so we didn't pay much
>>>>>>>> attention to the abstraction and just follow the v1 API. Now we are
>>>>>>>> designing the streaming API and catalog integration, the abstraction
>>>>>>>> becomes super important.
>>>>>>>>
>>>>>>>> I like this proposed abstraction and have successfully prototyped
>>>>>>>> it to make sure it works.
>>>>>>>>
>>>>>>>> During prototyping, I have to work around the issue that the
>>>>>>>> current streaming engine does query optimization/planning for each 
>>>>>>>> micro
>>>>>>>> batch. With this abstraction, the operator pushdown is only applied 
>>>>>>>> once
>>>>>>>> per-query. In my prototype, I do the physical planning up front to get 
>>>>>>>> the
>>>>>>>> pushdown result, and
>>>>>>>> add a logical linking node that wraps the resulting physical plan
>>>>>>>> node for the data source, and then swap that logical linking node into 
>>>>>>>> the
>>>>>>>> logical plan for each batch. In the future we should just let the 
>>>>>>>> streaming
>>>>>>>> engine do query optimization/planning only once.
>>>>>>>>
>>>>>>>> About pushdown, I think we should do it at the table level. The
>>>>>>>> table should create a new pushdow handler to apply operator pushdowm 
>>>>>>>> for
>>>>>>>> each scan/stream, and create the scan/stream with the pushdown result. 
>>>>>>>> The
>>>>>>>> rationale is, a table should have the same pushdown behavior 
>>>>>>>> regardless the
>>>>>>>> scan node.
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Wenchen
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com>
>>>>>>>> wrote:
>>>>>>>>
>>>>>>>>> I spent some time last week looking at the current data source v2
>>>>>>>>> apis, and I thought we should be a bit more buttoned up in terms of 
>>>>>>>>> the
>>>>>>>>> abstractions and the guarantees Spark provides. In particular, I feel 
>>>>>>>>> we
>>>>>>>>> need the following levels of "abstractions", to fit the use cases in 
>>>>>>>>> Spark,
>>>>>>>>> from batch, to streaming.
>>>>>>>>>
>>>>>>>>> Please don't focus on the naming at this stage. When possible, I
>>>>>>>>> draw parallels to what similar levels are named in the currently 
>>>>>>>>> committed
>>>>>>>>> api:
>>>>>>>>>
>>>>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC.
>>>>>>>>> There is currently no explicit class at this level.
>>>>>>>>>
>>>>>>>>> 1. Table: This should represent a logical dataset (with schema).
>>>>>>>>> This could be just a directory on the file system, or a table in the
>>>>>>>>> catalog. Operations on tables can include batch reads (Scan), streams,
>>>>>>>>> writes, and potentially other operations such as deletes. The closest 
>>>>>>>>> to
>>>>>>>>> the table level abstraction in the current code base is the "Provider"
>>>>>>>>> class, although Provider isn't quite a Table. This is similar to 
>>>>>>>>> Ryan's
>>>>>>>>> proposed design.
>>>>>>>>>
>>>>>>>>> 2. Stream: Specific to streaming. A stream is created out of a
>>>>>>>>> Table. This logically represents a an instance of a StreamingQuery.
>>>>>>>>> Pushdowns and options are handled at this layer. I.e. Spark 
>>>>>>>>> guarnatees to
>>>>>>>>> data source implementation pushdowns and options don't change within a
>>>>>>>>> Stream. Each Stream consists of a sequence of scans. There is no
>>>>>>>>> equivalent concept in the current committed code.
>>>>>>>>>
>>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming query,
>>>>>>>>> or a batch query. This should contain sufficient information and 
>>>>>>>>> methods so
>>>>>>>>> we can run a Spark job over a defined subset of the table. It's
>>>>>>>>> functionally equivalent to an RDD, except there's no dependency on 
>>>>>>>>> RDD so
>>>>>>>>> it is a smaller surface. In the current code, the equivalent class 
>>>>>>>>> would be
>>>>>>>>> the ScanConfig, which represents the information needed, but in order 
>>>>>>>>> to
>>>>>>>>> execute a job, ReadSupport is needed (various methods in ReadSupport 
>>>>>>>>> takes
>>>>>>>>> a ScanConfig).
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> To illustrate with pseudocode what the different levels mean, a
>>>>>>>>> batch query would look like the following:
>>>>>>>>>
>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes
>>>>>>>>> pushdown and options
>>>>>>>>> // run tasks on executors
>>>>>>>>>
>>>>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>>>>
>>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>>> val table = provider.createTable(options)
>>>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>>>>
>>>>>>>>> while(true) {
>>>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>>>   // run tasks on executors
>>>>>>>>> }
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Vs the current API, the above:
>>>>>>>>>
>>>>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan
>>>>>>>>> abstraction.
>>>>>>>>>
>>>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and
>>>>>>>>> options are handled there, rather than at the individual scan 
>>>>>>>>> (ReadSupport)
>>>>>>>>> level. Data source implementations don't need to worry about 
>>>>>>>>> pushdowns or
>>>>>>>>> options changing mid-stream. For batch, those happen when the scan 
>>>>>>>>> object
>>>>>>>>> is created.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> This email is just a high level sketch. I've asked Wenchen to
>>>>>>>>> prototype this, to see if it is actually feasible and the degree of 
>>>>>>>>> hacks
>>>>>>>>> it removes, or creates.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Ryan Blue
>>>>>>> Software Engineer
>>>>>>> Netflix
>>>>>>>
>>>>>>
>>>>>
>>>>> --
>>>>> Ryan Blue
>>>>> Software Engineer
>>>>> Netflix
>>>>>
>>>>
>>>
>>> --
>>> Ryan Blue
>>> Software Engineer
>>> Netflix
>>>
>>
>
> --
> Ryan Blue
> Software Engineer
> Netflix
>

Reply via email to