Github user alpinegizmo commented on a diff in the pull request:
https://github.com/apache/flink/pull/4833#discussion_r144842788
--- Diff: docs/dev/stream/operators/windows.md ---
@@ -721,6 +808,111 @@ input
</div>
</div>
+#### Incremental Window Aggregation with AggregateFunction
+
+The following example shows how an incremental `AggregateFunction` can be
combined with
+a `ProcesWindowFunction` to compute the average and also emit the key and
window along with
+the average.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+DataStream<Tuple2<String, Long> input = ...;
+
+input
+ .keyBy(<key selector>)
+ .timeWindow(<window assigner>)
+ .aggregate(new AverageAggregate(), new MyProcessWindowFunction());
+
+// Function definitions
+
+/**
+ * The accumulator is used to keep a running sum and a count. The {@code
getResult} method
+ * computes the average.
+ */
+private static class AverageAggregate
+ implements AggregateFunction<Tuple2<String, Long>, Tuple2<Long, Long>,
Double> {
+ @Override
+ public Tuple2<Long, Long> createAccumulator() {
+ return new Tuple2<>(0L, 0L);
+ }
+
+ @Override
+ public Tuple2<Long, Long> add(
+ Tuple2<String, Long> value, Tuple2<Long, Long> accumulator) {
+ return new Tuple2<>(accumulator.f0 + value.f1, accumulator.f1 + 1L);
+ }
+
+ @Override
+ public Double getResult(Tuple2<Long, Long> accumulator) {
+ return accumulator.f0 / accumulator.f1;
+ }
+
+ @Override
+ public Tuple2<Long, Long> merge(
+ Tuple2<Long, Long> a, Tuple2<Long, Long> b) {
+ return new Tuple2<>(a.f0 + b.f0, a.f1 + b.f1);
+ }
+}
+
+private static class MyProcessWindowFunction
+ implements ProcessWindowFunction<Double, Tuple2<String, Double>,
String, TimeWindow> {
+
+ public void apply(String key,
+ Context context,
+ Iterable<Double> averages,
+ Collector<Tuple2<String, Double>> out) {
+ Double average = averags.iterator().next();
--- End diff --
should be averages, not averags
---