Github user bowenli86 commented on a diff in the pull request:
https://github.com/apache/flink/pull/4833#discussion_r146134312
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -721,6 +808,111 @@ input
</div>
</div>
+#### Incremental Window Aggregation with AggregateFunction
+
+The following example shows how an incremental `AggregateFunction` can be
combined with
+a `ProcesWindowFunction` to compute the average and also emit the key and
window along with
+the average.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long> input = ...;
+
+input
+ .keyBy(<key selector>)
+ .timeWindow(<window assigner>)
+ .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
+
+// Function definitions
+
+/**
+ * The accumulator is used to keep a running sum and a count. The {@code
getResult} method
+ * computes the average.
+ */
+private static class AverageAggregate
+ implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>,
Double> {
+ @Override
+ public Tuple2<Long, Long> createAccumulator() {
+ return new Tuple2<>(0L, 0L);
+ }
+
+ @Override
+ public Tuple2<Long, Long> add(
+ Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
+ return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
+ }
+
+ @Override
+ public Double getResult(Tuple2<Long, Long> accumulator) {
+ return accumulator.f0 / accumulator.f1;
+ }
+
+ @Override
+ public Tuple2<Long, Long> merge(
+ Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
--- End diff --
ditto
---