This is an automated email from the ASF dual-hosted git repository.

jincheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a12908  [FLINK-11147][table][docs] Add documentation for 
TableAggregate Function
6a12908 is described below

commit 6a12908b15c398e37f8603cd84e0d30e14d07784
Author: hequn8128 <[email protected]>
AuthorDate: Fri Jun 21 12:48:11 2019 +0800

    [FLINK-11147][table][docs] Add documentation for TableAggregate Function
    
    This close #8669
---
 docs/dev/table/tableApi.md    |   4 +-
 docs/dev/table/tableApi.zh.md |   4 +-
 docs/dev/table/udfs.md        | 704 +++++++++++++++++++++++++++++++++++++++---
 docs/dev/table/udfs.zh.md     | 704 +++++++++++++++++++++++++++++++++++++++---
 docs/fig/udtagg-mechanism.png | Bin 0 -> 150838 bytes
 5 files changed, 1328 insertions(+), 88 deletions(-)

diff --git a/docs/dev/table/tableApi.md b/docs/dev/table/tableApi.md
index 4fb9f75..a0bd51a 100644
--- a/docs/dev/table/tableApi.md
+++ b/docs/dev/table/tableApi.md
@@ -2645,7 +2645,7 @@ Table table = input
       </td>
       <td>
         <p>Similar to a <b>GroupBy Aggregation</b>. Groups the rows on the 
grouping keys with the following running table aggregation operator to 
aggregate rows group-wise. The difference from an AggregateFunction is that 
TableAggregateFunction may return 0 or more records for a group. You have to 
close the "flatAggregate" with a select statement. And the select statement 
does not support aggregate functions.</p>
-        <p>Instead of using <code>emitValue</code> to output results, you can 
also use the <code>emitUpdateWithRetract</code> method. Different from 
<code>emitValue</code>, <code>emitUpdateWithRetract</code> is used to emit 
values that have been updated. This method outputs data incrementally in 
retract mode, i.e., once there is an update, we have to retract old records 
before sending new updated ones. The <code>emitUpdateWithRetract</code> method 
will be used in preference to the <code> [...]
+        <p>Instead of using <code>emitValue</code> to output results, you can 
also use the <code>emitUpdateWithRetract</code> method. Different from 
<code>emitValue</code>, <code>emitUpdateWithRetract</code> is used to emit 
values that have been updated. This method outputs data incrementally in 
retract mode, i.e., once there is an update, we have to retract old records 
before sending new updated ones. The <code>emitUpdateWithRetract</code> method 
will be used in preference to the <code> [...]
 {% highlight java %}
 /**
  * Accumulator for Top2.
@@ -2850,7 +2850,7 @@ val table = input
       </td>
       <td>
         <p>Similar to a <b>GroupBy Aggregation</b>. Groups the rows on the 
grouping keys with the following running table aggregation operator to 
aggregate rows group-wise. The difference from an AggregateFunction is that 
TableAggregateFunction may return 0 or more records for a group. You have to 
close the "flatAggregate" with a select statement. And the select statement 
does not support aggregate functions.</p>
-        <p>Instead of using <code>emitValue</code> to output results, you can 
also use the <code>emitUpdateWithRetract</code> method. Different from 
<code>emitValue</code>, <code>emitUpdateWithRetract</code> is used to emit 
values that have been updated. This method outputs data incrementally in 
retract mode, i.e., once there is an update, we have to retract old records 
before sending new updated ones. The <code>emitUpdateWithRetract</code> method 
will be used in preference to the <code> [...]
+        <p>Instead of using <code>emitValue</code> to output results, you can 
also use the <code>emitUpdateWithRetract</code> method. Different from 
<code>emitValue</code>, <code>emitUpdateWithRetract</code> is used to emit 
values that have been updated. This method outputs data incrementally in 
retract mode, i.e., once there is an update, we have to retract old records 
before sending new updated ones. The <code>emitUpdateWithRetract</code> method 
will be used in preference to the <code> [...]
 {% highlight scala %}
 import java.lang.{Integer => JInteger}
 import org.apache.flink.table.api.Types
diff --git a/docs/dev/table/tableApi.zh.md b/docs/dev/table/tableApi.zh.md
index 33ad6f9..7a75c0a 100644
--- a/docs/dev/table/tableApi.zh.md
+++ b/docs/dev/table/tableApi.zh.md
@@ -2644,7 +2644,7 @@ Table table = input
       </td>
       <td>
         <p>Similar to a <b>GroupBy Aggregation</b>. Groups the rows on the 
grouping keys with the following running table aggregation operator to 
aggregate rows group-wise. The difference from an AggregateFunction is that 
TableAggregateFunction may return 0 or more records for a group. You have to 
close the "flatAggregate" with a select statement. And the select statement 
does not support aggregate functions.</p>
-        <p>Instead of using <code>emitValue</code> to output results, you can 
also use the <code>emitUpdateWithRetract</code> method. Different from 
<code>emitValue</code>, <code>emitUpdateWithRetract</code> is used to emit 
values that have been updated. This method outputs data incrementally in 
retract mode, i.e., once there is an update, we have to retract old records 
before sending new updated ones. The <code>emitUpdateWithRetract</code> method 
will be used in preference to the <code> [...]
+        <p>Instead of using <code>emitValue</code> to output results, you can 
also use the <code>emitUpdateWithRetract</code> method. Different from 
<code>emitValue</code>, <code>emitUpdateWithRetract</code> is used to emit 
values that have been updated. This method outputs data incrementally in 
retract mode, i.e., once there is an update, we have to retract old records 
before sending new updated ones. The <code>emitUpdateWithRetract</code> method 
will be used in preference to the <code> [...]
 {% highlight java %}
 /**
  * Accumulator for Top2.
@@ -2849,7 +2849,7 @@ val table = input
       </td>
       <td>
         <p>Similar to a <b>GroupBy Aggregation</b>. Groups the rows on the 
grouping keys with the following running table aggregation operator to 
aggregate rows group-wise. The difference from an AggregateFunction is that 
TableAggregateFunction may return 0 or more records for a group. You have to 
close the "flatAggregate" with a select statement. And the select statement 
does not support aggregate functions.</p>
-        <p>Instead of using <code>emitValue</code> to output results, you can 
also use the <code>emitUpdateWithRetract</code> method. Different from 
<code>emitValue</code>, <code>emitUpdateWithRetract</code> is used to emit 
values that have been updated. This method outputs data incrementally in 
retract mode, i.e., once there is an update, we have to retract old records 
before sending new updated ones. The <code>emitUpdateWithRetract</code> method 
will be used in preference to the <code> [...]
+        <p>Instead of using <code>emitValue</code> to output results, you can 
also use the <code>emitUpdateWithRetract</code> method. Different from 
<code>emitValue</code>, <code>emitUpdateWithRetract</code> is used to emit 
values that have been updated. This method outputs data incrementally in 
retract mode, i.e., once there is an update, we have to retract old records 
before sending new updated ones. The <code>emitUpdateWithRetract</code> method 
will be used in preference to the <code> [...]
 {% highlight scala %}
 import java.lang.{Integer => JInteger}
 import org.apache.flink.table.api.Types
diff --git a/docs/dev/table/udfs.md b/docs/dev/table/udfs.md
index c4179ee..87bc804 100644
--- a/docs/dev/table/udfs.md
+++ b/docs/dev/table/udfs.md
@@ -269,7 +269,7 @@ class CustomTypeSplit extends TableFunction[Row] {
 Aggregation Functions
 ---------------------
 
-User-Defined Aggregate Functions (UDAGGs) aggregate a table (one ore more rows 
with one or more attributes) to a scalar value. 
+User-Defined Aggregate Functions (UDAGGs) aggregate a table (one or more rows 
with one or more attributes) to a scalar value. 
 
 <center>
 <img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udagg-mechanism.png" 
width="80%">
@@ -305,23 +305,49 @@ Detailed documentation for all methods of 
`AggregateFunction` is given below.
 <div data-lang="java" markdown="1">
 {% highlight java %}
 /**
-  * Base class for aggregation functions. 
+  * Base class for user-defined aggregates and table aggregates.
   *
-  * @param <T>   the type of the aggregation result
+  * @param <T>   the type of the aggregation result.
   * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
   *             aggregated values which are needed to compute an aggregation 
result.
-  *             AggregateFunction represents its state using accumulator, 
thereby the state of the
-  *             AggregateFunction must be put into the accumulator.
   */
-public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
+public abstract class UserDefinedAggregateFunction<T, ACC> extends 
UserDefinedFunction {
 
   /**
-    * Creates and init the Accumulator for this [[AggregateFunction]].
+    * Creates and init the Accumulator for this (table)aggregate function.
     *
     * @return the accumulator with the initial value
     */
   public ACC createAccumulator(); // MANDATORY
 
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's result.
+    *
+    * @return The TypeInformation of the (table)aggregate function's result or 
null if the result
+    *         type should be automatically inferred.
+    */
+  public TypeInformation<T> getResultType = null; // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's 
accumulator.
+    *
+    * @return The TypeInformation of the (table)aggregate function's 
accumulator or null if the
+    *         accumulator type should be automatically inferred.
+    */
+  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
+}
+
+/**
+  * Base class for aggregation functions. 
+  *
+  * @param <T>   the type of the aggregation result
+  * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  *             AggregateFunction represents its state using accumulator, 
thereby the state of the
+  *             AggregateFunction must be put into the accumulator.
+  */
+public abstract class AggregateFunction<T, ACC> extends 
UserDefinedAggregateFunction<T, ACC> {
+
   /** Processes the input values and update the provided accumulator instance. 
The method
     * accumulate can be overloaded with different custom types and arguments. 
An AggregateFunction
     * requires at least one accumulate() method.
@@ -350,7 +376,7 @@ public abstract class AggregateFunction<T, ACC> extends 
UserDefinedFunction {
     *                     be noted that the accumulator may contain the 
previous aggregated
     *                     results. Therefore user should not replace or clean 
this instance in the
     *                     custom merge method.
-    * @param its          an [[java.lang.Iterable]] pointed to a group of 
accumulators that will be
+    * @param its          an {@link java.lang.Iterable} pointed to a group of 
accumulators that will be
     *                     merged.
     */
   public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
@@ -381,28 +407,45 @@ public abstract class AggregateFunction<T, ACC> extends 
UserDefinedFunction {
     * @return true if the AggregateFunction requires an OVER window, false 
otherwise.
     */
   public Boolean requiresOver = false; // PRE-DEFINED
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/**
+  * Base class for user-defined aggregates and table aggregates.
+  *
+  * @tparam T   the type of the aggregation result.
+  * @tparam ACC the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  */
+abstract class UserDefinedAggregateFunction[T, ACC] extends 
UserDefinedFunction {
 
   /**
-    * Returns the TypeInformation of the AggregateFunction's result.
+    * Creates and init the Accumulator for this (table)aggregate function.
     *
-    * @return The TypeInformation of the AggregateFunction's result or null if 
the result type
-    *         should be automatically inferred.
+    * @return the accumulator with the initial value
     */
-  public TypeInformation<T> getResultType = null; // PRE-DEFINED
+  def createAccumulator(): ACC // MANDATORY
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's result.
+    *
+    * @return The TypeInformation of the (table)aggregate function's result or 
null if the result
+    *         type should be automatically inferred.
+    */
+  def getResultType: TypeInformation[T] = null // PRE-DEFINED
 
   /**
-    * Returns the TypeInformation of the AggregateFunction's accumulator.
+    * Returns the TypeInformation of the (table)aggregate function's 
accumulator.
     *
-    * @return The TypeInformation of the AggregateFunction's accumulator or 
null if the
+    * @return The TypeInformation of the (table)aggregate function's 
accumulator or null if the
     *         accumulator type should be automatically inferred.
     */
-  public TypeInformation<T> getAccumulatorType = null; // PRE-DEFINED
+  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
 }
-{% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
 /**
   * Base class for aggregation functions. 
   *
@@ -412,13 +455,7 @@ public abstract class AggregateFunction<T, ACC> extends 
UserDefinedFunction {
   *             AggregateFunction represents its state using accumulator, 
thereby the state of the
   *             AggregateFunction must be put into the accumulator.
   */
-abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
-  /**
-    * Creates and init the Accumulator for this [[AggregateFunction]].
-    *
-    * @return the accumulator with the initial value
-    */
-  def createAccumulator(): ACC // MANDATORY
+abstract class AggregateFunction[T, ACC] extends 
UserDefinedAggregateFunction[T, ACC] {
 
   /**
     * Processes the input values and update the provided accumulator instance. 
The method
@@ -480,22 +517,6 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
     * @return true if the AggregateFunction requires an OVER window, false 
otherwise.
     */
   def requiresOver: Boolean = false // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the AggregateFunction's result.
-    *
-    * @return The TypeInformation of the AggregateFunction's result or null if 
the result type
-    *         should be automatically inferred.
-    */
-  def getResultType: TypeInformation[T] = null // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the AggregateFunction's accumulator.
-    *
-    * @return The TypeInformation of the AggregateFunction's accumulator or 
null if the
-    *         accumulator type should be automatically inferred.
-    */
-  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
 }
 {% endhighlight %}
 </div>
@@ -655,6 +676,605 @@ tEnv.sqlQuery("SELECT user, wAvg(points, level) AS 
avgPoints FROM userScores GRO
 
 {% top %}
 
+Table Aggregation Functions
+---------------------
+
+User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or 
more rows with one or more attributes) to a result table with multi rows and 
columns. 
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png" 
width="80%">
+</center>
+
+The above figure shows an example of a table aggregation. Assume you have a 
table that contains data about beverages. The table consists of three columns, 
`id`, `name` and `price` and 5 rows. Imagine you need to find the top 2 highest 
prices of all beverages in the table, i.e., perform a `top2()` table 
aggregation. You would need to check each of the 5 rows and the result would be 
a table with the top 2 values.
+
+User-defined table aggregation functions are implemented by extending the 
`TableAggregateFunction` class. A `TableAggregateFunction` works as follows. 
First, it needs an `accumulator`, which is the data structure that holds the 
intermediate result of the aggregation. An empty accumulator is created by 
calling the `createAccumulator()` method of the `TableAggregateFunction`. 
Subsequently, the `accumulate()` method of the function is called for each 
input row to update the accumulator. Onc [...]
+
+**The following methods are mandatory for each `TableAggregateFunction`:**
+
+- `createAccumulator()`
+- `accumulate()` 
+
+Flink’s type extraction facilities can fail to identify complex data types, 
e.g., if they are not basic types or simple POJOs. So similar to 
`ScalarFunction` and `TableFunction`, `TableAggregateFunction` provides methods 
to specify the `TypeInformation` of the result type (through 
+ `TableAggregateFunction#getResultType()`) and the type of the accumulator 
(through `TableAggregateFunction#getAccumulatorType()`).
+ 
+Besides the above methods, there are a few contracted methods that can be 
+optionally implemented. While some of these methods allow the system more 
efficient query execution, others are mandatory for certain use cases. For 
instance, the `merge()` method is mandatory if the aggregation function should 
be applied in the context of a session group window (the accumulators of two 
session windows need to be joined when a row is observed that "connects" them). 
+
+**The following methods of `TableAggregateFunction` are required depending on 
the use case:**
+
+- `retract()` is required for aggregations on bounded `OVER` windows.
+- `merge()` is required for many batch aggregations and session window 
aggregations.
+- `resetAccumulator()` is required for many batch aggregations.
+- `emitValue()` is required for batch and window aggregations.
+
+**The following methods of `TableAggregateFunction` are used to improve the 
performance of streaming jobs:**
+
+- `emitUpdateWithRetract()` is used to emit values that have been updated 
under retract mode.
+
+For `emitValue` method, it emits full data according to the accumulator. Take 
TopN as an example, `emitValue` emit all top n values each time. This may bring 
performance problems for streaming jobs. To improve the performance, a user can 
also implement `emitUpdateWithRetract` method to improve the performance. The 
method outputs data incrementally in retract mode, i.e., once there is an 
update, we have to retract old records before sending new updated ones. The 
method will be used in pre [...]
+
+All methods of `TableAggregateFunction` must be declared as `public`, not 
`static` and named exactly as the names mentioned above. The methods 
`createAccumulator`, `getResultType`, and `getAccumulatorType` are defined in 
the parent abstract class of `TableAggregateFunction`, while others are 
contracted methods. In order to define a table aggregate function, one has to 
extend the base class `org.apache.flink.table.functions.TableAggregateFunction` 
and implement one (or more) `accumulate`  [...]
+
+Detailed documentation for all methods of `TableAggregateFunction` is given 
below. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/**
+  * Base class for user-defined aggregates and table aggregates.
+  *
+  * @param <T>   the type of the aggregation result.
+  * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  */
+public abstract class UserDefinedAggregateFunction<T, ACC> extends 
UserDefinedFunction {
+
+  /**
+    * Creates and init the Accumulator for this (table)aggregate function.
+    *
+    * @return the accumulator with the initial value
+    */
+  public ACC createAccumulator(); // MANDATORY
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's result.
+    *
+    * @return The TypeInformation of the (table)aggregate function's result or 
null if the result
+    *         type should be automatically inferred.
+    */
+  public TypeInformation<T> getResultType = null; // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's 
accumulator.
+    *
+    * @return The TypeInformation of the (table)aggregate function's 
accumulator or null if the
+    *         accumulator type should be automatically inferred.
+    */
+  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
+}
+
+/**
+  * Base class for table aggregation functions. 
+  *
+  * @param <T>   the type of the aggregation result
+  * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute a table 
aggregation result.
+  *             TableAggregateFunction represents its state using accumulator, 
thereby the state of
+  *             the TableAggregateFunction must be put into the accumulator.
+  */
+public abstract class TableAggregateFunction<T, ACC> extends 
UserDefinedAggregateFunction<T, ACC> {
+
+  /** Processes the input values and update the provided accumulator instance. 
The method
+    * accumulate can be overloaded with different custom types and arguments. 
A TableAggregateFunction
+    * requires at least one accumulate() method.
+    *
+    * @param accumulator           the accumulator which contains the current 
aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a 
new arrived data).
+    */
+  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
+
+  /**
+    * Retracts the input values from the accumulator instance. The current 
design assumes the
+    * inputs are the values that have been previously accumulated. The method 
retract can be
+    * overloaded with different custom types and arguments. This function must 
be implemented for
+    * datastream bounded over aggregate.
+    *
+    * @param accumulator           the accumulator which contains the current 
aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a 
new arrived data).
+    */
+  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
+
+  /**
+    * Merges a group of accumulator instances into one accumulator instance. 
This function must be
+    * implemented for datastream session window grouping aggregate and dataset 
grouping aggregate.
+    *
+    * @param accumulator  the accumulator which will keep the merged aggregate 
results. It should
+    *                     be noted that the accumulator may contain the 
previous aggregated
+    *                     results. Therefore user should not replace or clean 
this instance in the
+    *                     custom merge method.
+    * @param its          an {@link java.lang.Iterable} pointed to a group of 
accumulators that will be
+    *                     merged.
+    */
+  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
+
+  /**
+    * Called every time when an aggregation result should be materialized. The 
returned value
+    * could be either an early and incomplete result  (periodically emitted as 
data arrive) or
+    * the final result of the  aggregation.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param out         the collector used to output data
+    */
+  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
+  
+  /**
+    * Called every time when an aggregation result should be materialized. The 
returned value
+    * could be either an early and incomplete result (periodically emitted as 
data arrive) or
+    * the final result of the aggregation.
+    *
+    * Different from emitValue, emitUpdateWithRetract is used to emit values 
that have been updated.
+    * This method outputs data incrementally in retract mode, i.e., once there 
is an update, we
+    * have to retract old records before sending new updated ones. The 
emitUpdateWithRetract
+    * method will be used in preference to the emitValue method if both 
methods are defined in the
+    * table aggregate function, because the method is treated to be more 
efficient than emitValue
+    * as it can outputvalues incrementally.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param out         the retractable collector used to output data. Use 
collect method
+    *                    to output(add) records and use retract method to 
retract(delete)
+    *                    records.
+    */
+  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> 
out); // OPTIONAL
+  
+  /**
+    * Collects a record and forwards it. The collector can output retract 
messages with the retract
+    * method. Note: only use it in {@code emitRetractValueIncrementally}.
+    */
+  public interface RetractableCollector<T> extends Collector<T> {
+
+      /**
+        * Retract a record.
+        *
+        * @param record The record to retract.
+        */
+      void retract(T record);
+  }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/**
+  * Base class for user-defined aggregates and table aggregates.
+  *
+  * @tparam T   the type of the aggregation result.
+  * @tparam ACC the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  */
+abstract class UserDefinedAggregateFunction[T, ACC] extends 
UserDefinedFunction {
+
+  /**
+    * Creates and init the Accumulator for this (table)aggregate function.
+    *
+    * @return the accumulator with the initial value
+    */
+  def createAccumulator(): ACC // MANDATORY
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's result.
+    *
+    * @return The TypeInformation of the (table)aggregate function's result or 
null if the result
+    *         type should be automatically inferred.
+    */
+  def getResultType: TypeInformation[T] = null // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's 
accumulator.
+    *
+    * @return The TypeInformation of the (table)aggregate function's 
accumulator or null if the
+    *         accumulator type should be automatically inferred.
+    */
+  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
+}
+
+/**
+  * Base class for table aggregation functions. 
+  *
+  * @tparam T   the type of the aggregation result
+  * @tparam ACC the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  *             TableAggregateFunction represents its state using accumulator, 
thereby the state of
+  *             the TableAggregateFunction must be put into the accumulator.
+  */
+abstract class TableAggregateFunction[T, ACC] extends 
UserDefinedAggregateFunction[T, ACC] {
+
+  /**
+    * Processes the input values and update the provided accumulator instance. 
The method
+    * accumulate can be overloaded with different custom types and arguments. 
A TableAggregateFunction
+    * requires at least one accumulate() method.
+    *
+    * @param accumulator           the accumulator which contains the current 
aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a 
new arrived data).
+    */
+  def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY
+
+  /**
+    * Retracts the input values from the accumulator instance. The current 
design assumes the
+    * inputs are the values that have been previously accumulated. The method 
retract can be
+    * overloaded with different custom types and arguments. This function must 
be implemented for
+    * datastream bounded over aggregate.
+    *
+    * @param accumulator           the accumulator which contains the current 
aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a 
new arrived data).
+    */
+  def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL
+
+  /**
+    * Merges a group of accumulator instances into one accumulator instance. 
This function must be
+    * implemented for datastream session window grouping aggregate and dataset 
grouping aggregate.
+    *
+    * @param accumulator  the accumulator which will keep the merged aggregate 
results. It should
+    *                     be noted that the accumulator may contain the 
previous aggregated
+    *                     results. Therefore user should not replace or clean 
this instance in the
+    *                     custom merge method.
+    * @param its          an [[java.lang.Iterable]] pointed to a group of 
accumulators that will be
+    *                     merged.
+    */
+  def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL
+  
+  /**
+    * Called every time when an aggregation result should be materialized. The 
returned value
+    * could be either an early and incomplete result  (periodically emitted as 
data arrive) or
+    * the final result of the  aggregation.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param out         the collector used to output data
+    */
+  def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL
+
+  /**
+    * Called every time when an aggregation result should be materialized. The 
returned value
+    * could be either an early and incomplete result (periodically emitted as 
data arrive) or
+    * the final result of the aggregation.
+    *
+    * Different from emitValue, emitUpdateWithRetract is used to emit values 
that have been updated.
+    * This method outputs data incrementally in retract mode, i.e., once there 
is an update, we
+    * have to retract old records before sending new updated ones. The 
emitUpdateWithRetract
+    * method will be used in preference to the emitValue method if both 
methods are defined in the
+    * table aggregate function, because the method is treated to be more 
efficient than emitValue
+    * as it can outputvalues incrementally.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param out         the retractable collector used to output data. Use 
collect method
+    *                    to output(add) records and use retract method to 
retract(delete)
+    *                    records.
+    */
+  def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): 
Unit // OPTIONAL
+ 
+  /**
+    * Collects a record and forwards it. The collector can output retract 
messages with the retract
+    * method. Note: only use it in `emitRetractValueIncrementally`.
+    */
+  trait RetractableCollector[T] extends Collector[T] {
+    
+    /**
+      * Retract a record.
+      *
+      * @param record The record to retract.
+      */
+    def retract(record: T): Unit
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+
+The following example shows how to
+
+- define a `TableAggregateFunction` that calculates the top 2 values on a 
given column, 
+- register the function in the `TableEnvironment`, and 
+- use the function in a Table API query(TableAggregateFunction is only 
supported by Table API).  
+
+To calculate the top 2 values, the accumulator needs to store the biggest 2 
values of all the data that has been accumulated. In our example we define a 
class `Top2Accum` to be the accumulator. Accumulators are automatically 
backup-ed by Flink's checkpointing mechanism and restored in case of a failure 
to ensure exactly-once semantics.
+
+The `accumulate()` method of our `Top2` `TableAggregateFunction` has two 
inputs. The first one is the `Top2Accum` accumulator, the other one is the 
user-defined input: input value `v`. Although the `merge()` method is not 
mandatory for most table aggregation types, we provide it below as examples. 
Please note that we used Java primitive types and defined `getResultType()` and 
`getAccumulatorType()` methods in the Scala example because Flink type 
extraction does not work very well for Sca [...]
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * Accumulator for Top2.
+ */
+public class Top2Accum {
+    public Integer first;
+    public Integer second;
+}
+
+/**
+ * The top2 user-defined table aggregate function.
+ */
+public static class Top2 extends TableAggregateFunction<Tuple2<Integer, 
Integer>, Top2Accum> {
+
+    @Override
+    public Top2Accum createAccumulator() {
+        Top2Accum acc = new Top2Accum();
+        acc.first = Integer.MIN_VALUE;
+        acc.second = Integer.MIN_VALUE;
+        return acc;
+    }
+
+
+    public void accumulate(Top2Accum acc, Integer v) {
+        if (v > acc.first) {
+            acc.second = acc.first;
+            acc.first = v;
+        } else if (v > acc.second) {
+            acc.second = v;
+        }
+    }
+
+    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
+        for (Top2Accum otherAcc : iterable) {
+            accumulate(acc, otherAcc.first);
+            accumulate(acc, otherAcc.second);
+        }
+    }
+
+    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> 
out) {
+        // emit the value and rank
+        if (acc.first != Integer.MIN_VALUE) {
+            out.collect(Tuple2.of(acc.first, 1));
+        }
+        if (acc.second != Integer.MIN_VALUE) {
+            out.collect(Tuple2.of(acc.second, 2));
+        }
+    }
+}
+
+// register function
+StreamTableEnvironment tEnv = ...
+tEnv.registerFunction("top2", new Top2());
+
+// init table
+Table tab = ...;
+
+// use function
+tab.groupBy("key")
+    .flatAggregate("top2(a) as (v, rank)")
+    .select("key, v, rank");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.lang.{Integer => JInteger}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.TableAggregateFunction
+
+/**
+ * Accumulator for top2.
+ */
+class Top2Accum {
+  var first: JInteger = _
+  var second: JInteger = _
+}
+
+/**
+ * The top2 user-defined table aggregate function.
+ */
+class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], 
Top2Accum] {
+
+  override def createAccumulator(): Top2Accum = {
+    val acc = new Top2Accum
+    acc.first = Int.MinValue
+    acc.second = Int.MinValue
+    acc
+  }
+
+  def accumulate(acc: Top2Accum, v: Int) {
+    if (v > acc.first) {
+      acc.second = acc.first
+      acc.first = v
+    } else if (v > acc.second) {
+      acc.second = v
+    }
+  }
+
+  def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
+    val iter = its.iterator()
+    while (iter.hasNext) {
+      val top2 = iter.next()
+      accumulate(acc, top2.first)
+      accumulate(acc, top2.second)
+    }
+  }
+
+  def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): 
Unit = {
+    // emit the value and rank
+    if (acc.first != Int.MinValue) {
+      out.collect(JTuple2.of(acc.first, 1))
+    }
+    if (acc.second != Int.MinValue) {
+      out.collect(JTuple2.of(acc.second, 2))
+    }
+  }
+}
+
+// init table
+val tab = ...
+
+// use function
+tab
+  .groupBy('key)
+  .flatAggregate(top2('a) as ('v, 'rank))
+  .select('key, 'v, 'rank)
+
+{% endhighlight %}
+</div>
+</div>
+
+
+The following example shows how to use `emitUpdateWithRetract` method to emit 
only updates. To emit only updates, in our example, the accumulator keeps both 
old and new top 2 values. Note: if the N of topN is big, it may inefficient to 
keep both old and new values. One way to solve this case is to store the input 
record into the accumulator in `accumulate` method and then perform calculation 
in `emitUpdateWithRetract`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * Accumulator for Top2.
+ */
+public class Top2Accum {
+    public Integer first;
+    public Integer second;
+    public Integer oldFirst;
+    public Integer oldSecond;
+}
+
+/**
+ * The top2 user-defined table aggregate function.
+ */
+public static class Top2 extends TableAggregateFunction<Tuple2<Integer, 
Integer>, Top2Accum> {
+
+    @Override
+    public Top2Accum createAccumulator() {
+        Top2Accum acc = new Top2Accum();
+        acc.first = Integer.MIN_VALUE;
+        acc.second = Integer.MIN_VALUE;
+        acc.oldFirst = Integer.MIN_VALUE;
+        acc.oldSecond = Integer.MIN_VALUE;
+        return acc;
+    }
+
+    public void accumulate(Top2Accum acc, Integer v) {
+        if (v > acc.first) {
+            acc.second = acc.first;
+            acc.first = v;
+        } else if (v > acc.second) {
+            acc.second = v;
+        }
+    }
+
+    public void emitUpdateWithRetract(Top2Accum acc, 
RetractableCollector<Tuple2<Integer, Integer>> out) {
+        if (!acc.first.equals(acc.oldFirst)) {
+            // if there is an update, retract old value then emit new value.
+            if (acc.oldFirst != Integer.MIN_VALUE) {
+                out.retract(Tuple2.of(acc.oldFirst, 1));
+            }
+            out.collect(Tuple2.of(acc.first, 1));
+            acc.oldFirst = acc.first;
+        }
+
+        if (!acc.second.equals(acc.oldSecond)) {
+            // if there is an update, retract old value then emit new value.
+            if (acc.oldSecond != Integer.MIN_VALUE) {
+                out.retract(Tuple2.of(acc.oldSecond, 2));
+            }
+            out.collect(Tuple2.of(acc.second, 2));
+            acc.oldSecond = acc.second;
+        }
+    }
+}
+
+// register function
+StreamTableEnvironment tEnv = ...
+tEnv.registerFunction("top2", new Top2());
+
+// init table
+Table tab = ...;
+
+// use function
+tab.groupBy("key")
+    .flatAggregate("top2(a) as (v, rank)")
+    .select("key, v, rank");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.lang.{Integer => JInteger}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.TableAggregateFunction
+
+/**
+ * Accumulator for top2.
+ */
+class Top2Accum {
+  var first: JInteger = _
+  var second: JInteger = _
+  var oldFirst: JInteger = _
+  var oldSecond: JInteger = _
+}
+
+/**
+ * The top2 user-defined table aggregate function.
+ */
+class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], 
Top2Accum] {
+
+  override def createAccumulator(): Top2Accum = {
+    val acc = new Top2Accum
+    acc.first = Int.MinValue
+    acc.second = Int.MinValue
+    acc.oldFirst = Int.MinValue
+    acc.oldSecond = Int.MinValue
+    acc
+  }
+
+  def accumulate(acc: Top2Accum, v: Int) {
+    if (v > acc.first) {
+      acc.second = acc.first
+      acc.first = v
+    } else if (v > acc.second) {
+      acc.second = v
+    }
+  }
+
+  def emitUpdateWithRetract(
+    acc: Top2Accum,
+    out: RetractableCollector[JTuple2[JInteger, JInteger]])
+  : Unit = {
+    if (acc.first != acc.oldFirst) {
+      // if there is an update, retract old value then emit new value.
+      if (acc.oldFirst != Int.MinValue) {
+        out.retract(JTuple2.of(acc.oldFirst, 1))
+      }
+      out.collect(JTuple2.of(acc.first, 1))
+      acc.oldFirst = acc.first
+    }
+    if (acc.second != acc.oldSecond) {
+      // if there is an update, retract old value then emit new value.
+      if (acc.oldSecond != Int.MinValue) {
+        out.retract(JTuple2.of(acc.oldSecond, 2))
+      }
+      out.collect(JTuple2.of(acc.second, 2))
+      acc.oldSecond = acc.second
+    }
+  }
+}
+
+// init table
+val tab = ...
+
+// use function
+tab
+  .groupBy('key)
+  .flatAggregate(top2('a) as ('v, 'rank))
+  .select('key, 'v, 'rank)
+
+{% endhighlight %}
+</div>
+</div>
+
+
+{% top %}
+
 Best Practices for Implementing UDFs
 ------------------------------------
 
diff --git a/docs/dev/table/udfs.zh.md b/docs/dev/table/udfs.zh.md
index 3ce2f3f..724e5bb 100644
--- a/docs/dev/table/udfs.zh.md
+++ b/docs/dev/table/udfs.zh.md
@@ -269,7 +269,7 @@ class CustomTypeSplit extends TableFunction[Row] {
 Aggregation Functions
 ---------------------
 
-User-Defined Aggregate Functions (UDAGGs) aggregate a table (one ore more rows 
with one or more attributes) to a scalar value. 
+User-Defined Aggregate Functions (UDAGGs) aggregate a table (one or more rows 
with one or more attributes) to a scalar value. 
 
 <center>
 <img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udagg-mechanism.png" 
width="80%">
@@ -305,23 +305,49 @@ Detailed documentation for all methods of 
`AggregateFunction` is given below.
 <div data-lang="java" markdown="1">
 {% highlight java %}
 /**
-  * Base class for aggregation functions. 
+  * Base class for user-defined aggregates and table aggregates.
   *
-  * @param <T>   the type of the aggregation result
+  * @param <T>   the type of the aggregation result.
   * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
   *             aggregated values which are needed to compute an aggregation 
result.
-  *             AggregateFunction represents its state using accumulator, 
thereby the state of the
-  *             AggregateFunction must be put into the accumulator.
   */
-public abstract class AggregateFunction<T, ACC> extends UserDefinedFunction {
+public abstract class UserDefinedAggregateFunction<T, ACC> extends 
UserDefinedFunction {
 
   /**
-    * Creates and init the Accumulator for this [[AggregateFunction]].
+    * Creates and init the Accumulator for this (table)aggregate function.
     *
     * @return the accumulator with the initial value
     */
   public ACC createAccumulator(); // MANDATORY
 
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's result.
+    *
+    * @return The TypeInformation of the (table)aggregate function's result or 
null if the result
+    *         type should be automatically inferred.
+    */
+  public TypeInformation<T> getResultType = null; // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's 
accumulator.
+    *
+    * @return The TypeInformation of the (table)aggregate function's 
accumulator or null if the
+    *         accumulator type should be automatically inferred.
+    */
+  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
+}
+
+/**
+  * Base class for aggregation functions. 
+  *
+  * @param <T>   the type of the aggregation result
+  * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  *             AggregateFunction represents its state using accumulator, 
thereby the state of the
+  *             AggregateFunction must be put into the accumulator.
+  */
+public abstract class AggregateFunction<T, ACC> extends 
UserDefinedAggregateFunction<T, ACC> {
+
   /** Processes the input values and update the provided accumulator instance. 
The method
     * accumulate can be overloaded with different custom types and arguments. 
An AggregateFunction
     * requires at least one accumulate() method.
@@ -350,7 +376,7 @@ public abstract class AggregateFunction<T, ACC> extends 
UserDefinedFunction {
     *                     be noted that the accumulator may contain the 
previous aggregated
     *                     results. Therefore user should not replace or clean 
this instance in the
     *                     custom merge method.
-    * @param its          an [[java.lang.Iterable]] pointed to a group of 
accumulators that will be
+    * @param its          an {@link java.lang.Iterable} pointed to a group of 
accumulators that will be
     *                     merged.
     */
   public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
@@ -381,28 +407,45 @@ public abstract class AggregateFunction<T, ACC> extends 
UserDefinedFunction {
     * @return true if the AggregateFunction requires an OVER window, false 
otherwise.
     */
   public Boolean requiresOver = false; // PRE-DEFINED
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/**
+  * Base class for user-defined aggregates and table aggregates.
+  *
+  * @tparam T   the type of the aggregation result.
+  * @tparam ACC the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  */
+abstract class UserDefinedAggregateFunction[T, ACC] extends 
UserDefinedFunction {
 
   /**
-    * Returns the TypeInformation of the AggregateFunction's result.
+    * Creates and init the Accumulator for this (table)aggregate function.
     *
-    * @return The TypeInformation of the AggregateFunction's result or null if 
the result type
-    *         should be automatically inferred.
+    * @return the accumulator with the initial value
     */
-  public TypeInformation<T> getResultType = null; // PRE-DEFINED
+  def createAccumulator(): ACC // MANDATORY
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's result.
+    *
+    * @return The TypeInformation of the (table)aggregate function's result or 
null if the result
+    *         type should be automatically inferred.
+    */
+  def getResultType: TypeInformation[T] = null // PRE-DEFINED
 
   /**
-    * Returns the TypeInformation of the AggregateFunction's accumulator.
+    * Returns the TypeInformation of the (table)aggregate function's 
accumulator.
     *
-    * @return The TypeInformation of the AggregateFunction's accumulator or 
null if the
+    * @return The TypeInformation of the (table)aggregate function's 
accumulator or null if the
     *         accumulator type should be automatically inferred.
     */
-  public TypeInformation<T> getAccumulatorType = null; // PRE-DEFINED
+  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
 }
-{% endhighlight %}
-</div>
 
-<div data-lang="scala" markdown="1">
-{% highlight scala %}
 /**
   * Base class for aggregation functions. 
   *
@@ -412,13 +455,7 @@ public abstract class AggregateFunction<T, ACC> extends 
UserDefinedFunction {
   *             AggregateFunction represents its state using accumulator, 
thereby the state of the
   *             AggregateFunction must be put into the accumulator.
   */
-abstract class AggregateFunction[T, ACC] extends UserDefinedFunction {
-  /**
-    * Creates and init the Accumulator for this [[AggregateFunction]].
-    *
-    * @return the accumulator with the initial value
-    */
-  def createAccumulator(): ACC // MANDATORY
+abstract class AggregateFunction[T, ACC] extends 
UserDefinedAggregateFunction[T, ACC] {
 
   /**
     * Processes the input values and update the provided accumulator instance. 
The method
@@ -480,22 +517,6 @@ abstract class AggregateFunction[T, ACC] extends 
UserDefinedFunction {
     * @return true if the AggregateFunction requires an OVER window, false 
otherwise.
     */
   def requiresOver: Boolean = false // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the AggregateFunction's result.
-    *
-    * @return The TypeInformation of the AggregateFunction's result or null if 
the result type
-    *         should be automatically inferred.
-    */
-  def getResultType: TypeInformation[T] = null // PRE-DEFINED
-
-  /**
-    * Returns the TypeInformation of the AggregateFunction's accumulator.
-    *
-    * @return The TypeInformation of the AggregateFunction's accumulator or 
null if the
-    *         accumulator type should be automatically inferred.
-    */
-  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
 }
 {% endhighlight %}
 </div>
@@ -655,6 +676,605 @@ tEnv.sqlQuery("SELECT user, wAvg(points, level) AS 
avgPoints FROM userScores GRO
 
 {% top %}
 
+Table Aggregation Functions
+---------------------
+
+User-Defined Table Aggregate Functions (UDTAGGs) aggregate a table (one or 
more rows with one or more attributes) to a result table with multi rows and 
columns. 
+
+<center>
+<img alt="UDAGG mechanism" src="{{ site.baseurl }}/fig/udtagg-mechanism.png" 
width="80%">
+</center>
+
+The above figure shows an example of a table aggregation. Assume you have a 
table that contains data about beverages. The table consists of three columns, 
`id`, `name` and `price` and 5 rows. Imagine you need to find the top 2 highest 
prices of all beverages in the table, i.e., perform a `top2()` table 
aggregation. You would need to check each of the 5 rows and the result would be 
a table with the top 2 values.
+
+User-defined table aggregation functions are implemented by extending the 
`TableAggregateFunction` class. A `TableAggregateFunction` works as follows. 
First, it needs an `accumulator`, which is the data structure that holds the 
intermediate result of the aggregation. An empty accumulator is created by 
calling the `createAccumulator()` method of the `TableAggregateFunction`. 
Subsequently, the `accumulate()` method of the function is called for each 
input row to update the accumulator. Onc [...]
+
+**The following methods are mandatory for each `TableAggregateFunction`:**
+
+- `createAccumulator()`
+- `accumulate()` 
+
+Flink’s type extraction facilities can fail to identify complex data types, 
e.g., if they are not basic types or simple POJOs. So similar to 
`ScalarFunction` and `TableFunction`, `TableAggregateFunction` provides methods 
to specify the `TypeInformation` of the result type (through 
+ `TableAggregateFunction#getResultType()`) and the type of the accumulator 
(through `TableAggregateFunction#getAccumulatorType()`).
+ 
+Besides the above methods, there are a few contracted methods that can be 
+optionally implemented. While some of these methods allow the system more 
efficient query execution, others are mandatory for certain use cases. For 
instance, the `merge()` method is mandatory if the aggregation function should 
be applied in the context of a session group window (the accumulators of two 
session windows need to be joined when a row is observed that "connects" them). 
+
+**The following methods of `TableAggregateFunction` are required depending on 
the use case:**
+
+- `retract()` is required for aggregations on bounded `OVER` windows.
+- `merge()` is required for many batch aggregations and session window 
aggregations.
+- `resetAccumulator()` is required for many batch aggregations.
+- `emitValue()` is required for batch and window aggregations.
+
+**The following methods of `TableAggregateFunction` are used to improve the 
performance of streaming jobs:**
+
+- `emitUpdateWithRetract()` is used to emit values that have been updated 
under retract mode.
+
+For `emitValue` method, it emits full data according to the accumulator. Take 
TopN as an example, `emitValue` emit all top n values each time. This may bring 
performance problems for streaming jobs. To improve the performance, a user can 
also implement `emitUpdateWithRetract` method to improve the performance. The 
method outputs data incrementally in retract mode, i.e., once there is an 
update, we have to retract old records before sending new updated ones. The 
method will be used in pre [...]
+
+All methods of `TableAggregateFunction` must be declared as `public`, not 
`static` and named exactly as the names mentioned above. The methods 
`createAccumulator`, `getResultType`, and `getAccumulatorType` are defined in 
the parent abstract class of `TableAggregateFunction`, while others are 
contracted methods. In order to define a table aggregate function, one has to 
extend the base class `org.apache.flink.table.functions.TableAggregateFunction` 
and implement one (or more) `accumulate`  [...]
+
+Detailed documentation for all methods of `TableAggregateFunction` is given 
below. 
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/**
+  * Base class for user-defined aggregates and table aggregates.
+  *
+  * @param <T>   the type of the aggregation result.
+  * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  */
+public abstract class UserDefinedAggregateFunction<T, ACC> extends 
UserDefinedFunction {
+
+  /**
+    * Creates and init the Accumulator for this (table)aggregate function.
+    *
+    * @return the accumulator with the initial value
+    */
+  public ACC createAccumulator(); // MANDATORY
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's result.
+    *
+    * @return The TypeInformation of the (table)aggregate function's result or 
null if the result
+    *         type should be automatically inferred.
+    */
+  public TypeInformation<T> getResultType = null; // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's 
accumulator.
+    *
+    * @return The TypeInformation of the (table)aggregate function's 
accumulator or null if the
+    *         accumulator type should be automatically inferred.
+    */
+  public TypeInformation<ACC> getAccumulatorType = null; // PRE-DEFINED
+}
+
+/**
+  * Base class for table aggregation functions. 
+  *
+  * @param <T>   the type of the aggregation result
+  * @param <ACC> the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute a table 
aggregation result.
+  *             TableAggregateFunction represents its state using accumulator, 
thereby the state of
+  *             the TableAggregateFunction must be put into the accumulator.
+  */
+public abstract class TableAggregateFunction<T, ACC> extends 
UserDefinedAggregateFunction<T, ACC> {
+
+  /** Processes the input values and update the provided accumulator instance. 
The method
+    * accumulate can be overloaded with different custom types and arguments. 
A TableAggregateFunction
+    * requires at least one accumulate() method.
+    *
+    * @param accumulator           the accumulator which contains the current 
aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a 
new arrived data).
+    */
+  public void accumulate(ACC accumulator, [user defined inputs]); // MANDATORY
+
+  /**
+    * Retracts the input values from the accumulator instance. The current 
design assumes the
+    * inputs are the values that have been previously accumulated. The method 
retract can be
+    * overloaded with different custom types and arguments. This function must 
be implemented for
+    * datastream bounded over aggregate.
+    *
+    * @param accumulator           the accumulator which contains the current 
aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a 
new arrived data).
+    */
+  public void retract(ACC accumulator, [user defined inputs]); // OPTIONAL
+
+  /**
+    * Merges a group of accumulator instances into one accumulator instance. 
This function must be
+    * implemented for datastream session window grouping aggregate and dataset 
grouping aggregate.
+    *
+    * @param accumulator  the accumulator which will keep the merged aggregate 
results. It should
+    *                     be noted that the accumulator may contain the 
previous aggregated
+    *                     results. Therefore user should not replace or clean 
this instance in the
+    *                     custom merge method.
+    * @param its          an {@link java.lang.Iterable} pointed to a group of 
accumulators that will be
+    *                     merged.
+    */
+  public void merge(ACC accumulator, java.lang.Iterable<ACC> its); // OPTIONAL
+
+  /**
+    * Called every time when an aggregation result should be materialized. The 
returned value
+    * could be either an early and incomplete result  (periodically emitted as 
data arrive) or
+    * the final result of the  aggregation.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param out         the collector used to output data
+    */
+  public void emitValue(ACC accumulator, Collector<T> out); // OPTIONAL
+  
+  /**
+    * Called every time when an aggregation result should be materialized. The 
returned value
+    * could be either an early and incomplete result (periodically emitted as 
data arrive) or
+    * the final result of the aggregation.
+    *
+    * Different from emitValue, emitUpdateWithRetract is used to emit values 
that have been updated.
+    * This method outputs data incrementally in retract mode, i.e., once there 
is an update, we
+    * have to retract old records before sending new updated ones. The 
emitUpdateWithRetract
+    * method will be used in preference to the emitValue method if both 
methods are defined in the
+    * table aggregate function, because the method is treated to be more 
efficient than emitValue
+    * as it can outputvalues incrementally.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param out         the retractable collector used to output data. Use 
collect method
+    *                    to output(add) records and use retract method to 
retract(delete)
+    *                    records.
+    */
+  public void emitUpdateWithRetract(ACC accumulator, RetractableCollector<T> 
out); // OPTIONAL
+  
+  /**
+    * Collects a record and forwards it. The collector can output retract 
messages with the retract
+    * method. Note: only use it in {@code emitRetractValueIncrementally}.
+    */
+  public interface RetractableCollector<T> extends Collector<T> {
+
+      /**
+        * Retract a record.
+        *
+        * @param record The record to retract.
+        */
+      void retract(T record);
+  }
+}
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+/**
+  * Base class for user-defined aggregates and table aggregates.
+  *
+  * @tparam T   the type of the aggregation result.
+  * @tparam ACC the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  */
+abstract class UserDefinedAggregateFunction[T, ACC] extends 
UserDefinedFunction {
+
+  /**
+    * Creates and init the Accumulator for this (table)aggregate function.
+    *
+    * @return the accumulator with the initial value
+    */
+  def createAccumulator(): ACC // MANDATORY
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's result.
+    *
+    * @return The TypeInformation of the (table)aggregate function's result or 
null if the result
+    *         type should be automatically inferred.
+    */
+  def getResultType: TypeInformation[T] = null // PRE-DEFINED
+
+  /**
+    * Returns the TypeInformation of the (table)aggregate function's 
accumulator.
+    *
+    * @return The TypeInformation of the (table)aggregate function's 
accumulator or null if the
+    *         accumulator type should be automatically inferred.
+    */
+  def getAccumulatorType: TypeInformation[ACC] = null // PRE-DEFINED
+}
+
+/**
+  * Base class for table aggregation functions. 
+  *
+  * @tparam T   the type of the aggregation result
+  * @tparam ACC the type of the aggregation accumulator. The accumulator is 
used to keep the
+  *             aggregated values which are needed to compute an aggregation 
result.
+  *             TableAggregateFunction represents its state using accumulator, 
thereby the state of
+  *             the TableAggregateFunction must be put into the accumulator.
+  */
+abstract class TableAggregateFunction[T, ACC] extends 
UserDefinedAggregateFunction[T, ACC] {
+
+  /**
+    * Processes the input values and update the provided accumulator instance. 
The method
+    * accumulate can be overloaded with different custom types and arguments. 
A TableAggregateFunction
+    * requires at least one accumulate() method.
+    *
+    * @param accumulator           the accumulator which contains the current 
aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a 
new arrived data).
+    */
+  def accumulate(accumulator: ACC, [user defined inputs]): Unit // MANDATORY
+
+  /**
+    * Retracts the input values from the accumulator instance. The current 
design assumes the
+    * inputs are the values that have been previously accumulated. The method 
retract can be
+    * overloaded with different custom types and arguments. This function must 
be implemented for
+    * datastream bounded over aggregate.
+    *
+    * @param accumulator           the accumulator which contains the current 
aggregated results
+    * @param [user defined inputs] the input value (usually obtained from a 
new arrived data).
+    */
+  def retract(accumulator: ACC, [user defined inputs]): Unit // OPTIONAL
+
+  /**
+    * Merges a group of accumulator instances into one accumulator instance. 
This function must be
+    * implemented for datastream session window grouping aggregate and dataset 
grouping aggregate.
+    *
+    * @param accumulator  the accumulator which will keep the merged aggregate 
results. It should
+    *                     be noted that the accumulator may contain the 
previous aggregated
+    *                     results. Therefore user should not replace or clean 
this instance in the
+    *                     custom merge method.
+    * @param its          an [[java.lang.Iterable]] pointed to a group of 
accumulators that will be
+    *                     merged.
+    */
+  def merge(accumulator: ACC, its: java.lang.Iterable[ACC]): Unit // OPTIONAL
+  
+  /**
+    * Called every time when an aggregation result should be materialized. The 
returned value
+    * could be either an early and incomplete result  (periodically emitted as 
data arrive) or
+    * the final result of the  aggregation.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param out         the collector used to output data
+    */
+  def emitValue(accumulator: ACC, out: Collector[T]): Unit // OPTIONAL
+
+  /**
+    * Called every time when an aggregation result should be materialized. The 
returned value
+    * could be either an early and incomplete result (periodically emitted as 
data arrive) or
+    * the final result of the aggregation.
+    *
+    * Different from emitValue, emitUpdateWithRetract is used to emit values 
that have been updated.
+    * This method outputs data incrementally in retract mode, i.e., once there 
is an update, we
+    * have to retract old records before sending new updated ones. The 
emitUpdateWithRetract
+    * method will be used in preference to the emitValue method if both 
methods are defined in the
+    * table aggregate function, because the method is treated to be more 
efficient than emitValue
+    * as it can outputvalues incrementally.
+    *
+    * @param accumulator the accumulator which contains the current
+    *                    aggregated results
+    * @param out         the retractable collector used to output data. Use 
collect method
+    *                    to output(add) records and use retract method to 
retract(delete)
+    *                    records.
+    */
+  def emitUpdateWithRetract(accumulator: ACC, out: RetractableCollector[T]): 
Unit // OPTIONAL
+ 
+  /**
+    * Collects a record and forwards it. The collector can output retract 
messages with the retract
+    * method. Note: only use it in `emitRetractValueIncrementally`.
+    */
+  trait RetractableCollector[T] extends Collector[T] {
+    
+    /**
+      * Retract a record.
+      *
+      * @param record The record to retract.
+      */
+    def retract(record: T): Unit
+  }
+}
+{% endhighlight %}
+</div>
+</div>
+
+
+The following example shows how to
+
+- define a `TableAggregateFunction` that calculates the top 2 values on a 
given column, 
+- register the function in the `TableEnvironment`, and 
+- use the function in a Table API query(TableAggregateFunction is only 
supported by Table API).  
+
+To calculate the top 2 values, the accumulator needs to store the biggest 2 
values of all the data that has been accumulated. In our example we define a 
class `Top2Accum` to be the accumulator. Accumulators are automatically 
backup-ed by Flink's checkpointing mechanism and restored in case of a failure 
to ensure exactly-once semantics.
+
+The `accumulate()` method of our `Top2` `TableAggregateFunction` has two 
inputs. The first one is the `Top2Accum` accumulator, the other one is the 
user-defined input: input value `v`. Although the `merge()` method is not 
mandatory for most table aggregation types, we provide it below as examples. 
Please note that we used Java primitive types and defined `getResultType()` and 
`getAccumulatorType()` methods in the Scala example because Flink type 
extraction does not work very well for Sca [...]
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * Accumulator for Top2.
+ */
+public class Top2Accum {
+    public Integer first;
+    public Integer second;
+}
+
+/**
+ * The top2 user-defined table aggregate function.
+ */
+public static class Top2 extends TableAggregateFunction<Tuple2<Integer, 
Integer>, Top2Accum> {
+
+    @Override
+    public Top2Accum createAccumulator() {
+        Top2Accum acc = new Top2Accum();
+        acc.first = Integer.MIN_VALUE;
+        acc.second = Integer.MIN_VALUE;
+        return acc;
+    }
+
+
+    public void accumulate(Top2Accum acc, Integer v) {
+        if (v > acc.first) {
+            acc.second = acc.first;
+            acc.first = v;
+        } else if (v > acc.second) {
+            acc.second = v;
+        }
+    }
+
+    public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
+        for (Top2Accum otherAcc : iterable) {
+            accumulate(acc, otherAcc.first);
+            accumulate(acc, otherAcc.second);
+        }
+    }
+
+    public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> 
out) {
+        // emit the value and rank
+        if (acc.first != Integer.MIN_VALUE) {
+            out.collect(Tuple2.of(acc.first, 1));
+        }
+        if (acc.second != Integer.MIN_VALUE) {
+            out.collect(Tuple2.of(acc.second, 2));
+        }
+    }
+}
+
+// register function
+StreamTableEnvironment tEnv = ...
+tEnv.registerFunction("top2", new Top2());
+
+// init table
+Table tab = ...;
+
+// use function
+tab.groupBy("key")
+    .flatAggregate("top2(a) as (v, rank)")
+    .select("key, v, rank");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.lang.{Integer => JInteger}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.TableAggregateFunction
+
+/**
+ * Accumulator for top2.
+ */
+class Top2Accum {
+  var first: JInteger = _
+  var second: JInteger = _
+}
+
+/**
+ * The top2 user-defined table aggregate function.
+ */
+class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], 
Top2Accum] {
+
+  override def createAccumulator(): Top2Accum = {
+    val acc = new Top2Accum
+    acc.first = Int.MinValue
+    acc.second = Int.MinValue
+    acc
+  }
+
+  def accumulate(acc: Top2Accum, v: Int) {
+    if (v > acc.first) {
+      acc.second = acc.first
+      acc.first = v
+    } else if (v > acc.second) {
+      acc.second = v
+    }
+  }
+
+  def merge(acc: Top2Accum, its: JIterable[Top2Accum]): Unit = {
+    val iter = its.iterator()
+    while (iter.hasNext) {
+      val top2 = iter.next()
+      accumulate(acc, top2.first)
+      accumulate(acc, top2.second)
+    }
+  }
+
+  def emitValue(acc: Top2Accum, out: Collector[JTuple2[JInteger, JInteger]]): 
Unit = {
+    // emit the value and rank
+    if (acc.first != Int.MinValue) {
+      out.collect(JTuple2.of(acc.first, 1))
+    }
+    if (acc.second != Int.MinValue) {
+      out.collect(JTuple2.of(acc.second, 2))
+    }
+  }
+}
+
+// init table
+val tab = ...
+
+// use function
+tab
+  .groupBy('key)
+  .flatAggregate(top2('a) as ('v, 'rank))
+  .select('key, 'v, 'rank)
+
+{% endhighlight %}
+</div>
+</div>
+
+
+The following example shows how to use `emitUpdateWithRetract` method to emit 
only updates. To emit only updates, in our example, the accumulator keeps both 
old and new top 2 values. Note: if the N of topN is big, it may inefficient to 
keep both old and new values. One way to solve this case is to store the input 
record into the accumulator in `accumulate` method and then perform calculation 
in `emitUpdateWithRetract`.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+/**
+ * Accumulator for Top2.
+ */
+public class Top2Accum {
+    public Integer first;
+    public Integer second;
+    public Integer oldFirst;
+    public Integer oldSecond;
+}
+
+/**
+ * The top2 user-defined table aggregate function.
+ */
+public static class Top2 extends TableAggregateFunction<Tuple2<Integer, 
Integer>, Top2Accum> {
+
+    @Override
+    public Top2Accum createAccumulator() {
+        Top2Accum acc = new Top2Accum();
+        acc.first = Integer.MIN_VALUE;
+        acc.second = Integer.MIN_VALUE;
+        acc.oldFirst = Integer.MIN_VALUE;
+        acc.oldSecond = Integer.MIN_VALUE;
+        return acc;
+    }
+
+    public void accumulate(Top2Accum acc, Integer v) {
+        if (v > acc.first) {
+            acc.second = acc.first;
+            acc.first = v;
+        } else if (v > acc.second) {
+            acc.second = v;
+        }
+    }
+
+    public void emitUpdateWithRetract(Top2Accum acc, 
RetractableCollector<Tuple2<Integer, Integer>> out) {
+        if (!acc.first.equals(acc.oldFirst)) {
+            // if there is an update, retract old value then emit new value.
+            if (acc.oldFirst != Integer.MIN_VALUE) {
+                out.retract(Tuple2.of(acc.oldFirst, 1));
+            }
+            out.collect(Tuple2.of(acc.first, 1));
+            acc.oldFirst = acc.first;
+        }
+
+        if (!acc.second.equals(acc.oldSecond)) {
+            // if there is an update, retract old value then emit new value.
+            if (acc.oldSecond != Integer.MIN_VALUE) {
+                out.retract(Tuple2.of(acc.oldSecond, 2));
+            }
+            out.collect(Tuple2.of(acc.second, 2));
+            acc.oldSecond = acc.second;
+        }
+    }
+}
+
+// register function
+StreamTableEnvironment tEnv = ...
+tEnv.registerFunction("top2", new Top2());
+
+// init table
+Table tab = ...;
+
+// use function
+tab.groupBy("key")
+    .flatAggregate("top2(a) as (v, rank)")
+    .select("key, v, rank");
+
+{% endhighlight %}
+</div>
+
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+import java.lang.{Integer => JInteger}
+import org.apache.flink.table.api.Types
+import org.apache.flink.table.functions.TableAggregateFunction
+
+/**
+ * Accumulator for top2.
+ */
+class Top2Accum {
+  var first: JInteger = _
+  var second: JInteger = _
+  var oldFirst: JInteger = _
+  var oldSecond: JInteger = _
+}
+
+/**
+ * The top2 user-defined table aggregate function.
+ */
+class Top2 extends TableAggregateFunction[JTuple2[JInteger, JInteger], 
Top2Accum] {
+
+  override def createAccumulator(): Top2Accum = {
+    val acc = new Top2Accum
+    acc.first = Int.MinValue
+    acc.second = Int.MinValue
+    acc.oldFirst = Int.MinValue
+    acc.oldSecond = Int.MinValue
+    acc
+  }
+
+  def accumulate(acc: Top2Accum, v: Int) {
+    if (v > acc.first) {
+      acc.second = acc.first
+      acc.first = v
+    } else if (v > acc.second) {
+      acc.second = v
+    }
+  }
+
+  def emitUpdateWithRetract(
+    acc: Top2Accum,
+    out: RetractableCollector[JTuple2[JInteger, JInteger]])
+  : Unit = {
+    if (acc.first != acc.oldFirst) {
+      // if there is an update, retract old value then emit new value.
+      if (acc.oldFirst != Int.MinValue) {
+        out.retract(JTuple2.of(acc.oldFirst, 1))
+      }
+      out.collect(JTuple2.of(acc.first, 1))
+      acc.oldFirst = acc.first
+    }
+    if (acc.second != acc.oldSecond) {
+      // if there is an update, retract old value then emit new value.
+      if (acc.oldSecond != Int.MinValue) {
+        out.retract(JTuple2.of(acc.oldSecond, 2))
+      }
+      out.collect(JTuple2.of(acc.second, 2))
+      acc.oldSecond = acc.second
+    }
+  }
+}
+
+// init table
+val tab = ...
+
+// use function
+tab
+  .groupBy('key)
+  .flatAggregate(top2('a) as ('v, 'rank))
+  .select('key, 'v, 'rank)
+
+{% endhighlight %}
+</div>
+</div>
+
+
+{% top %}
+
 Best Practices for Implementing UDFs
 ------------------------------------
 
diff --git a/docs/fig/udtagg-mechanism.png b/docs/fig/udtagg-mechanism.png
new file mode 100644
index 0000000..d5e3b71
Binary files /dev/null and b/docs/fig/udtagg-mechanism.png differ

Reply via email to