[ 
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348675#comment-16348675
 ] 

Fabian Hueske commented on FLINK-8545:
--------------------------------------

Yes, many user have been asking for this feature recently.
Great that you're taking the initiative to work on this!

I'd propose to first start with the {{DataStream}} to {{Table}} upsert 
conversion and work on the {{UpsertTableSource}} interface later because these 
will need careful API design.

The keys could be defined similar to the time indicators, i.e.,
{code:java}
DataStream[(String, Long, Int)] input = ???
// upsert with key
Table table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// upsert without key -> single row table
Table table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
I think it would be good to be able to declare a time attribute that decides 
whether an upsert is performed or not.
For example this could look like this:
{code:java}
DataStream[(String, Long, Int)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 
'c.key){code}
 

 

> Implement upsert stream table source 
> -------------------------------------
>
>                 Key: FLINK-8545
>                 URL: https://issues.apache.org/jira/browse/FLINK-8545
>             Project: Flink
>          Issue Type: New Feature
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink 
> sql/table-api, it is valuable to enable table source with upsert mode. I will 
> provide a design doc later and we can have more discussions. Any suggestions 
> are warmly welcomed !
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to