Wenchen,

I'm not really sure what you're proposing here. What is a `LogicalWrite`?
Is it something that mirrors the read side in your PR?

I think that I agree that if we have a Write independent of the Table that
carries the commit and abort methods, then we can create it directly
without a WriteConfig. So I tentatively agree with what you propose,
assuming that I understand it correctly.

rb

On Tue, Sep 4, 2018 at 8:42 PM Wenchen Fan <cloud0...@gmail.com> wrote:

> I'm switching to my another Gmail account, let's see if it still gets
> dropped this time.
>
> Hi Ryan,
>
> I'm thinking about the write path and feel the abstraction should be the
> same.
>
> We still have logical and physical writing. And the table can create
> different logical writing based on how to write. e.g., append, delete,
> replaceWhere, etc.
>
> One thing I'm not sure about is the WriteConfig. With the WriteConfig, the
> API would look like
> trait Table {
>   WriteConfig newAppendWriteConfig();
>
>   WriteConfig newDeleteWriteConfig(deleteExprs);
>
>   LogicalWrite newLogicalWrite(writeConfig);
> }
>
> Without WriteConfig, the API looks like
> trait Table {
>   LogicalWrite newAppendWrite();
>
>   LogicalWrite newDeleteWrite(deleteExprs);
> }
>
>
> It looks to me that the API is simpler without WriteConfig, what do you
> think?
>
> Thanks,
> Wenchen
>
> On Wed, Sep 5, 2018 at 4:24 AM Ryan Blue <rb...@netflix.com.invalid>
> wrote:
>
>> Latest from Wenchen in case it was dropped.
>>
>> ---------- Forwarded message ---------
>> From: Wenchen Fan <wenc...@databricks.com>
>> Date: Mon, Sep 3, 2018 at 6:16 AM
>> Subject: Re: data source api v2 refactoring
>> To: <mri...@gmail.com>
>> Cc: Ryan Blue <rb...@netflix.com>, Reynold Xin <r...@databricks.com>, <
>> dev@spark.apache.org>
>>
>>
>> Hi Mridul,
>>
>> I'm not sure what's going on, my email was CC'ed to the dev list.
>>
>>
>> Hi Ryan,
>>
>> The logical and physical scan idea sounds good. To add more color
>> to Jungtaek's question, both micro-batch and continuous mode have
>> the logical and physical scan, but there is a difference: for micro-batch
>> mode, a physical scan outputs data for one epoch, but it's not true for
>> continuous mode.
>>
>> I'm not sure if it's necessary to include streaming epoch in the API
>> abstraction, for features like metrics reporting.
>>
>> On Sun, Sep 2, 2018 at 12:31 PM Mridul Muralidharan <mri...@gmail.com>
>> wrote:
>>
>>>
>>> Is it only me or are all others getting Wenchen’s mails ? (Obviously
>>> Ryan did :-) )
>>> I did not see it in the mail thread I received or in archives ... [1]
>>> Wondering which othersenderswere getting dropped (if yes).
>>>
>>> Regards
>>> Mridul
>>>
>>> [1]
>>> http://apache-spark-developers-list.1001551.n3.nabble.com/data-source-api-v2-refactoring-td24848.html
>>>
>>>
>>> On Sat, Sep 1, 2018 at 8:58 PM Ryan Blue <rb...@netflix.com.invalid>
>>> wrote:
>>>
>>>> Thanks for clarifying, Wenchen. I think that's what I expected.
>>>>
>>>> As for the abstraction, here's the way that I think about it: there are
>>>> two important parts of a scan: the definition of what will be read, and
>>>> task sets that actually perform the read. In batch, there's one definition
>>>> of the scan and one task set so it makes sense that there's one scan object
>>>> that encapsulates both of these concepts. For streaming, we need to
>>>> separate the two into the definition of what will be read (the stream or
>>>> streaming read) and the task sets that are run (scans). That way, the
>>>> streaming read behaves like a factory for scans, producing scans that
>>>> handle the data either in micro-batches or using continuous tasks.
>>>>
>>>> To address Jungtaek's question, I think that this does work with
>>>> continuous. In continuous mode, the query operators keep running and send
>>>> data to one another directly. The API still needs a streaming read layer
>>>> because it may still produce more than one continuous scan. That would
>>>> happen when the underlying source changes and Spark needs to reconfigure. I
>>>> think the example here is when partitioning in a Kafka topic changes and
>>>> Spark needs to re-map Kafka partitions to continuous tasks.
>>>>
>>>> rb
>>>>
>>>> On Fri, Aug 31, 2018 at 5:12 PM Wenchen Fan <wenc...@databricks.com>
>>>> wrote:
>>>>
>>>>> Hi Ryan,
>>>>>
>>>>> Sorry I may use a wrong wording. The pushdown is done with ScanConfig,
>>>>> which is not table/stream/scan, but something between them. The table
>>>>> creates ScanConfigBuilder, and table creates stream/scan with ScanConfig.
>>>>> For streaming source, stream is the one to take care of the pushdown
>>>>> result. For batch source, it's the scan.
>>>>>
>>>>> It's a little tricky because stream is an abstraction for streaming
>>>>> source only. Better ideas are welcome!
>>>>>
>>>>
>>>>> On Sat, Sep 1, 2018 at 7:26 AM Ryan Blue <rb...@netflix.com> wrote:
>>>>>
>>>>>> Thanks, Reynold!
>>>>>>
>>>>>> I think your API sketch looks great. I appreciate having the Table
>>>>>> level in the abstraction to plug into as well. I think this makes it 
>>>>>> clear
>>>>>> what everything does, particularly having the Stream level that 
>>>>>> represents
>>>>>> a configured (by ScanConfig) streaming read and can act as a factory for
>>>>>> individual batch scans or for continuous scans.
>>>>>>
>>>>>> Wenchen, I'm not sure what you mean by doing pushdown at the table
>>>>>> level. It seems to mean that pushdown is specific to a batch scan or
>>>>>> streaming read, which seems to be what you're saying as well. Wouldn't 
>>>>>> the
>>>>>> pushdown happen to create a ScanConfig, which is then used as Reynold
>>>>>> suggests? Looking forward to seeing this PR when you get it posted. 
>>>>>> Thanks
>>>>>> for all of your work on this!
>>>>>>
>>>>>> rb
>>>>>>
>>>>>> On Fri, Aug 31, 2018 at 3:52 PM Wenchen Fan <wenc...@databricks.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Thank Reynold for writing this and starting the discussion!
>>>>>>>
>>>>>>> Data source v2 was started with batch only, so we didn't pay much
>>>>>>> attention to the abstraction and just follow the v1 API. Now we are
>>>>>>> designing the streaming API and catalog integration, the abstraction
>>>>>>> becomes super important.
>>>>>>>
>>>>>>> I like this proposed abstraction and have successfully prototyped it
>>>>>>> to make sure it works.
>>>>>>>
>>>>>>> During prototyping, I have to work around the issue that the current
>>>>>>> streaming engine does query optimization/planning for each micro batch.
>>>>>>> With this abstraction, the operator pushdown is only applied once
>>>>>>> per-query. In my prototype, I do the physical planning up front to get 
>>>>>>> the
>>>>>>> pushdown result, and
>>>>>>> add a logical linking node that wraps the resulting physical plan
>>>>>>> node for the data source, and then swap that logical linking node into 
>>>>>>> the
>>>>>>> logical plan for each batch. In the future we should just let the 
>>>>>>> streaming
>>>>>>> engine do query optimization/planning only once.
>>>>>>>
>>>>>>> About pushdown, I think we should do it at the table level. The
>>>>>>> table should create a new pushdow handler to apply operator pushdowm for
>>>>>>> each scan/stream, and create the scan/stream with the pushdown result. 
>>>>>>> The
>>>>>>> rationale is, a table should have the same pushdown behavior regardless 
>>>>>>> the
>>>>>>> scan node.
>>>>>>>
>>>>>>> Thanks,
>>>>>>> Wenchen
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Fri, Aug 31, 2018 at 2:00 PM Reynold Xin <r...@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> I spent some time last week looking at the current data source v2
>>>>>>>> apis, and I thought we should be a bit more buttoned up in terms of the
>>>>>>>> abstractions and the guarantees Spark provides. In particular, I feel 
>>>>>>>> we
>>>>>>>> need the following levels of "abstractions", to fit the use cases in 
>>>>>>>> Spark,
>>>>>>>> from batch, to streaming.
>>>>>>>>
>>>>>>>> Please don't focus on the naming at this stage. When possible, I
>>>>>>>> draw parallels to what similar levels are named in the currently 
>>>>>>>> committed
>>>>>>>> api:
>>>>>>>>
>>>>>>>> 0. Format: This represents a specific format, e.g. Parquet, ORC.
>>>>>>>> There is currently no explicit class at this level.
>>>>>>>>
>>>>>>>> 1. Table: This should represent a logical dataset (with schema).
>>>>>>>> This could be just a directory on the file system, or a table in the
>>>>>>>> catalog. Operations on tables can include batch reads (Scan), streams,
>>>>>>>> writes, and potentially other operations such as deletes. The closest 
>>>>>>>> to
>>>>>>>> the table level abstraction in the current code base is the "Provider"
>>>>>>>> class, although Provider isn't quite a Table. This is similar to Ryan's
>>>>>>>> proposed design.
>>>>>>>>
>>>>>>>> 2. Stream: Specific to streaming. A stream is created out of a
>>>>>>>> Table. This logically represents a an instance of a StreamingQuery.
>>>>>>>> Pushdowns and options are handled at this layer. I.e. Spark guarnatees 
>>>>>>>> to
>>>>>>>> data source implementation pushdowns and options don't change within a
>>>>>>>> Stream. Each Stream consists of a sequence of scans. There is no
>>>>>>>> equivalent concept in the current committed code.
>>>>>>>>
>>>>>>>> 3. Scan: A physical scan -- either as part of a streaming query, or
>>>>>>>> a batch query. This should contain sufficient information and methods 
>>>>>>>> so we
>>>>>>>> can run a Spark job over a defined subset of the table. It's 
>>>>>>>> functionally
>>>>>>>> equivalent to an RDD, except there's no dependency on RDD so it is a
>>>>>>>> smaller surface. In the current code, the equivalent class would be the
>>>>>>>> ScanConfig, which represents the information needed, but in order to
>>>>>>>> execute a job, ReadSupport is needed (various methods in ReadSupport 
>>>>>>>> takes
>>>>>>>> a ScanConfig).
>>>>>>>>
>>>>>>>>
>>>>>>>> To illustrate with pseudocode what the different levels mean, a
>>>>>>>> batch query would look like the following:
>>>>>>>>
>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>> val table = provider.createTable(options)
>>>>>>>> val scan = table.createScan(scanConfig) // scanConfig includes
>>>>>>>> pushdown and options
>>>>>>>> // run tasks on executors
>>>>>>>>
>>>>>>>> A streaming micro-batch scan would look like the following:
>>>>>>>>
>>>>>>>> val provider = reflection[Format]("parquet")
>>>>>>>> val table = provider.createTable(options)
>>>>>>>> val stream = table.createStream(scanConfig)
>>>>>>>>
>>>>>>>> while(true) {
>>>>>>>>   val scan = streamingScan.createScan(startOffset)
>>>>>>>>   // run tasks on executors
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>> Vs the current API, the above:
>>>>>>>>
>>>>>>>> 1. Creates an explicit Table abstraction, and an explicit Scan
>>>>>>>> abstraction.
>>>>>>>>
>>>>>>>> 2. Have an explicit Stream level and makes it clear pushdowns and
>>>>>>>> options are handled there, rather than at the individual scan 
>>>>>>>> (ReadSupport)
>>>>>>>> level. Data source implementations don't need to worry about pushdowns 
>>>>>>>> or
>>>>>>>> options changing mid-stream. For batch, those happen when the scan 
>>>>>>>> object
>>>>>>>> is created.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> This email is just a high level sketch. I've asked Wenchen to
>>>>>>>> prototype this, to see if it is actually feasible and the degree of 
>>>>>>>> hacks
>>>>>>>> it removes, or creates.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>
>>>>>> --
>>>>>> Ryan Blue
>>>>>> Software Engineer
>>>>>> Netflix
>>>>>>
>>>>>
>>>>
>>>> --
>>>> Ryan Blue
>>>> Software Engineer
>>>> Netflix
>>>>
>>>
>>
>> --
>> Ryan Blue
>> Software Engineer
>> Netflix
>>
>

-- 
Ryan Blue
Software Engineer
Netflix

Reply via email to