Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4833#discussion_r146134241
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -427,6 +427,93 @@ input
The above example sums up the second fields of the tuples for all elements
in a window.
+### AggregateFunction
+
+An `AggregateFunction` is a generalized version of a `ReduceFunction` that
has three types: an
+input type (`IN`), accumulator type (`ACC`), and an output type (`OUT`).
The input type is the type
+of elements in the input stream and the `AggregateFunction` has a method
for adding one input
+element to an accumulator. The interface also has methods for creating an
initial accumulator,
+for merging two accumulators into one accumulator and for extracting an
output (of type `OUT`) from
+an accumulator. We will see how this works in the example below.
+
+Same as with `ReduceFunction`, Flink will incrementally aggregate input
elements of a window as they
+arrive.
+
+A `AggregateFunction` can be defined and used like this:
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+/**
+ * The accumulator is used to keep a running sum and a count. The {@code
getResult} method
+ * computes the average.
+ */
+private static class AverageAggregate
+ implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>,
Double> {
+ @Override
+ public Tuple2<Long, Long> createAccumulator() {
+ return new Tuple2<>(0L, 0L);
+ }
+
+ @Override
+ public Tuple2<Long, Long> add(
+ Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
+ return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
+ }
+
+ @Override
+ public Double getResult(Tuple2<Long, Long> accumulator) {
+ return accumulator.f0 / accumulator.f1;
+ }
+
+ @Override
+ public Tuple2<Long, Long> merge(
+ Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
--- End diff --
ditto
---