dianfu commented on a change in pull request #8311: [FLINK-10976][table] Add
Aggregate operator to Table API
URL: https://github.com/apache/flink/pull/8311#discussion_r281000946
##########
File path: docs/dev/table/tableApi.md
##########
@@ -1957,6 +2008,52 @@ class MyFlatMapFunction extends TableFunction[Row] {
val func = new MyFlatMapFunction
val table = input
.flatMap(func('c)).as('a, 'b)
+{% endhighlight %}
+ </td>
+ </tr>
+
+ <tr>
+ <td>
+ <strong>Aggregate</strong><br>
+ <span class="label label-primary">Batch</span> <span class="label
label-primary">Streaming</span>
+ </td>
+ <td>
+ <p>Performs an aggregate operation with an aggregate function. You
have to close the "aggregate" with a select statement. The output of aggregate
will be flattened if the output type is a composite type.</p>
+{% highlight scala %}
+case class MyMinMaxAcc(var min: Int, var max: Int)
+
+class MyMinMax extends AggregateFunction[Row, MyMinMaxAcc] {
+
+ def accumulate(acc: MyMinMaxAcc, value: Int): Unit = {
+ if (value < acc.min) {
+ acc.min = value
+ }
+ if (value > acc.max) {
+ acc.max = value
+ }
+ }
+
+ def resetAccumulator(acc: MyMinMaxAcc): Unit = {
Review comment:
The Scala example has resetAccumulator, while the Java example has not.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services