[ 
https://issues.apache.org/jira/browse/FLINK-8577?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16356714#comment-16356714
 ] 

Fabian Hueske edited comment on FLINK-8577 at 2/8/18 9:30 AM:
--------------------------------------------------------------

I was thinking about how to handle deletion flags. I see three options:
 # always require deletion flags. DataStreams that are append only, would need 
to add a map to add the flag.
 # have special methods for upsert with and without deletion flags
 # have a mandatory parameter that determines whether the input has flags or 
not.

So the choices are:

{code}
 DataStream[(String, Long, Int)] input = ???
 DataStream[(Boolean, (String, Long, Int))] flaggedInput = ???

// WITH DELETE FLAGS

// always require flags
 table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key)
 // special method
 table = tEnv.upsertFromStreamWithDeletes(flaggedInput, 'a, 'b, 'c.key)
 // mandatory parameter. 
 table = tEnv.upsertFromStream(flaggedInput, deletes = true, 'a, 'b, 'c.key)

// WITHOUT DELETE FLAGS

// always require flags, so add them manually
 table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key)
 // special method
 table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
 // mandatory parameter
 table = tEnv.upsertFromStream(input, deletes = false, 'a, 'b, 'c.key)
{code}

 I think I'm preferring option 1. because it keeps the API slim and is 
consistent with the format of the UpsertTableSink.


What do you think [~hequn8128], [~twalthr]?


was (Author: fhueske):
I was thinking about how to handle deletion flags. I see three options:
 # always require deletion flags. DataStreams that are append only, would need 
to add a map to add the flag.
 # have special methods for upsert with and without deletion flags
 # have a mandatory parameter that determines whether the input has flags or 
not.

So the choices are:

{code}
 DataStream[(String, Long, Int)] input = ???
 DataStream[(Boolean, (String, Long, Int))] flaggedInput = ???

// WITH DELETE FLAGS

// always require flags
 table = tEnv.upsertFromStream(flaggedInput, 'a, 'b, 'c.key)
 // special method
 table = tEnv.upsertFromStreamWithDeletes(flaggedInput, 'a, 'b, 'c.key)
 // mandatory parameter. 
 table = tEnv.upsertFromStream(flaggedInput, deletes = true, 'a, 'b, 'c.key)

// WITHOUT DELETE FLAGS

// always require flags, so add them manually
 table = tEnv.upsertFromStream(input.map(new InsertFlagger()), 'a, 'b, 'c.key)
 // special method
 table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
 // mandatory parameter
 table = tEnv.upsertFromStream(input, deletes = false, 'a, 'b, 'c.key)
 I think I'm preferring option 1. because it keeps the API slim and is 
consistent with the format of the UpsertTableSink.
{code}

What do you think [~hequn8128], [~twalthr]?

> Implement proctime DataStream to Table upsert conversion.
> ---------------------------------------------------------
>
>                 Key: FLINK-8577
>                 URL: https://issues.apache.org/jira/browse/FLINK-8577
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: Hequn Cheng
>            Assignee: Hequn Cheng
>            Priority: Major
>
> Api will looks like:
> {code:java}
> DataStream[(String, Long, Int)] input = ???
> // upsert with keyTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c.key)
> // upsert without key -> single row tableTable 
> table = tEnv.upsertFromStream(input, 'a, 'b, 'c){code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to