[
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16348675#comment-16348675
]
Fabian Hueske commented on FLINK-8545:
--------------------------------------
Yes, many user have been asking for this feature recently.
Great that you're taking the initiative to work on this!
I'd propose to first start with the {{DataStream}} to {{Table}} upsert
conversion and work on the {{UpsertTableSource}} interface later because these
will need careful API design.
The keys could be defined similar to the time indicators, i.e.,
{code:java}
DataStream[(String, Long, Int)] input = ???
// upsert with key
Table table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
// upsert without key -> single row table
Table table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}
I think it would be good to be able to declare a time attribute that decides
whether an upsert is performed or not.
For example this could look like this:
{code:java}
DataStream[(String, Long, Int)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder,
'c.key){code}
> Implement upsert stream table source
> -------------------------------------
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Hequn Cheng
> Assignee: Hequn Cheng
> Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink
> sql/table-api, it is valuable to enable table source with upsert mode. I will
> provide a design doc later and we can have more discussions. Any suggestions
> are warmly welcomed !
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)