[ https://issues.apache.org/jira/browse/FLINK-8545?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16349724#comment-16349724 ]
Hequn Cheng edited comment on FLINK-8545 at 2/2/18 3:29 AM: ------------------------------------------------------------ Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the {{DataStream}} to {{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. was (Author: hequn8128): Hi, [~fhueske] Great to hear your suggestions. I agree with you, so let's start with the \{{DataStream}} to \{{Table}} conversion first. I like your proposal especially for the upsert check. Besides, we may also need to support ingesting with delete messages. Something looks like: {code:java} DataStream[(String, Long, Int, Boolean)] input = ??? Table table = tEnv.upsertFromStream(input, 'a, 'b.rowtime.upsertOrder, 'c.key, 'd.isDelete) {code} But, this may bring some side effects, as current aggregations assume that all deletes are on condition of deleting an existing record. I don't have a complete solution yet. Maybe we may need some refactors to adapt this. I will think more about it. Thanks. > Implement upsert stream table source > ------------------------------------- > > Key: FLINK-8545 > URL: https://issues.apache.org/jira/browse/FLINK-8545 > Project: Flink > Issue Type: New Feature > Components: Table API & SQL > Reporter: Hequn Cheng > Assignee: Hequn Cheng > Priority: Major > > As more and more users are eager for ingesting data with upsert mode in flink > sql/table-api, it is valuable to enable table source with upsert mode. I will > provide a design doc later and we can have more discussions. Any suggestions > are warmly welcomed ! > -- This message was sent by Atlassian JIRA (v7.6.3#76005)