[
https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349724#comment-16349724
]
Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:29 AM:
------------------------------------------------------------
Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's
start with the {{DataStream}} to {{Table}} conversion first.
I like your proposal especially for the upsert check. Besides, we may also need
to support ingesting with delete messages. Something looks like:
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key,
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all
deletes are on condition of deleting an existing record. I don't have a
complete solution yet. Maybe we may need some refactors to adapt this. I will
think more about it. Thanks.
was (Author: hequn8128):
Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's
start with the \{{DataStream}} to \{{Table}} conversion first.
I like your proposal especially for the upsert check. Besides, we may also need
to support ingesting with delete messages. Something looks like:
{code:java}
DataStream[(String, Long, Int, Boolean)] input = ???
Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key,
'd.isDelete)
{code}
But, this may bring some side effects, as current aggregations assume that all
deletes are on condition of deleting an existing record. I don't have a
complete solution yet. Maybe we may need some refactors to adapt this. I will
think more about it. Thanks.
> Implement upsert stream table source
> -------------------------------------
>
> Key: FLINK-8545
> URL: https://issues.apache.org/jira/browse/FLINK-8545
> Project: Flink
> Issue Type: New Feature
> Components: Table API & SQL
> Reporter: Hequn Cheng
> Assignee: Hequn Cheng
> Priority: Major
>
> As more and more users are eager for ingesting data with upsert mode in flink
> sql/table-api, it is valuable to enable table source with upsert mode. I will
> provide a design doc later and we can have more discussions. Any suggestions
> are warmly welcomed !
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)