alpinegizmo commented on a change in pull request #13384:
URL: https://github.com/apache/flink/pull/13384#discussion_r490241063
##########
File path: docs/dev/libs/state_processor_api.md
##########
@@ -345,6 +345,171 @@ Along with reading registered state values, each key has
access to a `Context` w
{% panel **Note:** When using a `KeyedStateReaderFunction`, all state
descriptors must be registered eagerly inside of open. Any attempt to call a
`RuntimeContext#get*State` will result in a `RuntimeException`. %}
+### Window State
+
+The state processor api supports reading state from a [window operator]({{
site.baseurl }}/dev/stream/operators/windows.html).
+When reading a window state, users specify the operator id, window assigner,
and aggregation type.
+
+Additionally, a `WindowReaderFunction` can be specified to enrich each read
with additional information similiar to
+a `WindowFunction` or `ProcessWindowFunction`.
+
+Suppose a DataStream application that counts the number of clicks per user per
minute.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class Click {
+ public String userId;
+
+ public LocalDateTime time;
+}
+
+class ClickCounter implements AggregateFunction<Click, Integer, Integer> {
+
+ @Override
+ public Integer createAccumulator() {
+ return 0;
+ }
+
+ @Override
+ public Integer add(Click value, Integer accumulator) {
+ return 1 + accumulator;
+ }
+
+ @Override
+ public Integer getResult(Integer accumulator) {
+ return accumulator;
+ }
+
+ @Override
+ public Integer merge(Integer a, Integer b) {
+ return a + b;
+ }
+}
+
+DataStream<Click> clicks = . . .
+
+clicks
+ .keyBy(click -> click.userId)
+ .window(TumblingEventTimeWindows.of(Time.minutes(1)))
+ .aggregate(new ClickCounter())
+ .uid("click-window")
+ .addSink(new Sink());
+
+{% endhighlight %}
+</div>
+<div data-lang="scala" markdown="1">
+{% highlight scala %}
+
+import java.lang.{Integer => JInteger}
+
+case class Click(userId: String, time: LocalDateTime)
+
+class ClickCounter extends AggregateFunction[Click, JInteger, JInteger] {
+
+
+ override def createAccumulator(): JInteger = 0
+
+ override def add(value: Click, accumulator: JInteger): JInteger = 1 +
accumulator
+
+ override def getResult(accumulator: JInteger): JInteger = accumulator
+
+ override def merge(a: JInteger, b: JInteger): JInteger = a + b
+}
+
+DataStream[Click] clicks = . . .
+
+clicks
+ .keyBy(click => click.userId)
+ .window(TumblingEventTimeWindows.of(Time.minutes(1)))
+ .aggregate(new ClickCounter())
+ .uid("click-window")
+ .addSink(new Sink())
+
+{% endhighlight %}
+</div>
+</div>
+
+This state can be read using the below code.
+
+<div class="codetabs" markdown="1">
+<div data-lang="java" markdown="1">
+{% highlight java %}
+
+class ClickState {
+
+ public String userId;
+
+ public int count;
+
+ public TimeWindow window;
+
+ public Set<Long> triggerTimers;
+}
+
+class ClickReader extends WindowReaderFunction<Integer, ClickState, String,
TimeWindow> {
+
+ @Override
+ public void readWindow(String key, Context<TimeWindow> context,
Iterable<Integer> elements, Collector<ClickState> out) {
+ ClickState state = new ClickState();
+ state.userId = key;
+ state.count = elements.iterator().next();
+ state.window = context.window();
+ state.triggerTimers = context.registeredEventTimeTimers();
+
+ out.collect(state);
+ }
+}
+
+ExecutionEnvironment batchEnv = ExecutionEnvironment.getExecutionEnvironment();
+ExistingSavepoint savepoint = Savepoint.load(batchEnv,
"hdfs://checkpoint-dir", new MemoryStateBackend());
+
+savepoint
+ .window(TumblingEventTimeWindows.of(Time.milliseconds(10)))
Review comment:
Does the duration even matter here?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]