[
https://issues.apache.org/jira/browse/FLINK-6650?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16019334#comment-16019334
]
ASF GitHub Bot commented on FLINK-6650:
---------------------------------------
GitHub user sunjincheng121 opened a pull request:
https://github.com/apache/flink/pull/3958
[FLINK-6650][table] Improve the error message for toAppendStream
The PR have three small changes:
1. Improve the error message for toAppendStream.
2. Change incorrect variable name.
3.Add JAVA DOC for key parameter of method.
- [x] General
- The pull request references the related JIRA issue
("[FLINK-6650][table] Improve the error message for toAppendStream")
- The pull request addresses only one issue
- Each commit in the PR has a meaningful commit message (including the
JIRA id)
- [ ] Documentation
- Documentation has been added for new functionality
- Old documentation affected by the pull request has been updated
- JavaDoc for public methods has been added
- [ ] Tests & Build
- Functionality added by the pull request is covered by tests
- `mvn clean verify` has been executed successfully locally or a Travis
build has passed
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/sunjincheng121/flink FLINK-6650-PR
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/3958.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #3958
----
----
> Fix Non-windowed group-aggregate error when using append-table mode.
> --------------------------------------------------------------------
>
> Key: FLINK-6650
> URL: https://issues.apache.org/jira/browse/FLINK-6650
> Project: Flink
> Issue Type: Sub-task
> Components: Table API & SQL
> Reporter: sunjincheng
> Assignee: sunjincheng
>
> When I test Non-windowed group-aggregate with {{stream.toTable(tEnv, 'a, 'b,
> 'c).select('a.sum, weightAvgFun('a, 'b)).toAppendStream[Row].addSink(new
> StreamITCase.StringSink)}}, I got the error as follows:
> {code}
> org.apache.flink.table.api.TableException: Table is not an append-only table.
> Output needs to handle update and delete changes.
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:631)
> at
> org.apache.flink.table.api.StreamTableEnvironment.translate(StreamTableEnvironment.scala:607)
> at
> org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:219)
> at
> org.apache.flink.table.api.scala.StreamTableEnvironment.toAppendStream(StreamTableEnvironment.scala:195)
> at
> org.apache.flink.table.api.scala.TableConversions.toAppendStream(TableConversions.scala:121)
> {code}
> The reason is {{DataStreamGroupAggregate#producesUpdates}} as follows:
> {code}
> override def producesUpdates = true
> {code}
> I think in the view of the user, what user want are(for example):
> Data:
> {code}
> val data = List(
> (1L, 1, "Hello"),
> (2L, 2, "Hello"),
> (3L, 3, "Hello"),
> (4L, 4, "Hello"),
> (5L, 5, "Hello"),
> (6L, 6, "Hello"),
> (7L, 7, "Hello World"),
> (8L, 8, "Hello World"),
> (20L, 20, "Hello World"))
> {code}
> * Case1:
> TableAPI
> {code}
> stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum)
> .toAppendStream[Row].addSink(new StreamITCase.StringSink)
> {code}
> Result
> {code}
> // StringSink process datas:
> 1
> 3
> 6
> 10
> 15
> 21
> 28
> 36
> 56
> // Last output datas:
> 1
> 3
> 6
> 10
> 15
> 21
> 28
> 36
> 56
> {code}
> * Case 2:
> TableAPI
> {code}
> stream.toTable(tEnv, 'a, 'b, 'c).select('a.sum).toRetractStream[Row]
> .addSink(new StreamITCase.RetractingSink)
> {code}
> Result:
> {code}
> // RetractingSink process datas:
> (true,1)
> (false,1)
> (true,3)
> (false,3)
> (true,6)
> (false,6)
> (true,10)
> (false,10)
> (true,15)
> (false,15)
> (true,21)
> (false,21)
> (true,28)
> (false,28)
> (true,36)
> (false,36)
> (true,56)
> // Last output data:
> 56
> {code}
> In fact about #Case 1,we can using unbounded OVER windows, as follows:
> TableAPI
> {code}
> stream.toTable(tEnv, 'a, 'b, 'c, 'proctime.proctime)
> .window(Over orderBy 'proctime preceding UNBOUNDED_ROW as 'w)
> .select('a.sum over 'w)
> .toAppendStream[Row].addSink(new StreamITCase.StringSink)
> {code}
> Result
> {code}
> Same as #Case1
> {code}
> But after the [FLINK-6649 | https://issues.apache.org/jira/browse/FLINK-6649]
> OVER can not express the #Case1 with earlyFiring.
> So I still think that Non-windowed group-aggregate not always update-table,
> user can decide which mode to use.
> Is there any drawback to this improvement? Welcome anyone feedback?
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)