[ 
https://issues.apache.org/jira/browse/FLINK-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

sunjincheng updated FLINK-6650:
-------------------------------
    Description: 
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

        at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
        at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
        at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
        at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
        at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (3L, 3, "Hello"),
      (4L, 4, "Hello"),
      (5L, 5, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result
{code}
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
    .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
    .select('a.sum over 'w)
    .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}

But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] 
OVER can not express the #Case1 with earlyFiring.

So I still think Non-windowed group-aggregate not always update-table, user can 
decide which mode to use.

Is there any drawback to this improvement? Welcome anyone feedback?

  was:
When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
StreamITCase.StringSink)}}, I got the error as follows:
{code}
org.apache.flink.table.api.TableException: Table is not an append-only table. 
Output needs to handle update and delete changes.

        at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
        at 
org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
        at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
        at 
org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
        at 
org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
{code}
The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
{code}
override def producesUpdates = true
{code}

I think in the view of the user, what user want are(for example):
Data:
{code}
val data = List(
      (1L, 1, "Hello"),
      (2L, 2, "Hello"),
      (3L, 3, "Hello"),
      (4L, 4, "Hello"),
      (5L, 5, "Hello"),
      (6L, 6, "Hello"),
      (7L, 7, "Hello World"),
      (8L, 8, "Hello World"),
      (20L, 20, "Hello World"))
{code}

* Case1:
TableAPI
{code}
 stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result
{code}
1
3
6
10
15
21
28
36
56
{code}

* Case 2:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
.addSink(new StreamITCase.RetractingSink)
{code}
Result:
{code}
56
{code}
In fact about #Case 1,we can using unbounded OVER windows, as follows:
TableAPI
{code}
stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
    .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
    .select('a.sum over 'w)
    .toAppendStream[Row].addSink(new StreamITCase.StringSink)
{code}
Result
{code}
Same as #Case1
{code}

But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649]
OVER can not express the #Case1 with earlyFiring.

So I still think Non-windowed group-aggregate not always update-table, user can 
decide which mode to use.

Is there any drawback to this improvement? Welcome anyone feedback?


> Fix Non-windowed group-aggregate error when using append-table mode.
> --------------------------------------------------------------------
>
>                 Key: FLINK-6650
>                 URL: https://issues.apache.org/jira/browse/FLINK-6650
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Table API & SQL
>            Reporter: sunjincheng
>            Assignee: sunjincheng
>
> When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b, 
> 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new 
> StreamITCase.StringSink)}}, I got the error as follows:
> {code}
> org.apache.flink.table.api.TableException: Table is not an append-only table. 
> Output needs to handle update and delete changes.
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
>       at 
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
>       at 
> org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
>       at 
> org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
>       at 
> org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
> {code}
> The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
> {code}
> override def producesUpdates = true
> {code}
> I think in the view of the user, what user want are(for example):
> Data:
> {code}
> val data = List(
>       (1L, 1, "Hello"),
>       (2L, 2, "Hello"),
>       (3L, 3, "Hello"),
>       (4L, 4, "Hello"),
>       (5L, 5, "Hello"),
>       (6L, 6, "Hello"),
>       (7L, 7, "Hello World"),
>       (8L, 8, "Hello World"),
>       (20L, 20, "Hello World"))
> {code}
> * Case1:
> TableAPI
> {code}
>  stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
> .addSink(new StreamITCase.RetractingSink)
> {code}
> Result
> {code}
> 1
> 3
> 6
> 10
> 15
> 21
> 28
> 36
> 56
> {code}
> * Case 2:
> TableAPI
> {code}
> stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
> .addSink(new StreamITCase.RetractingSink)
> {code}
> Result:
> {code}
> 56
> {code}
> In fact about #Case 1,we can using unbounded OVER windows, as follows:
> TableAPI
> {code}
> stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
>     .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
>     .select('a.sum over 'w)
>     .toAppendStream[Row].addSink(new StreamITCase.StringSink)
> {code}
> Result
> {code}
> Same as #Case1
> {code}
> But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649] 
> OVER can not express the #Case1 with earlyFiring.
> So I still think Non-windowed group-aggregate not always update-table, user 
> can decide which mode to use.
> Is there any drawback to this improvement? Welcome anyone feedback?



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to