[
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17196180#comment-17196180
]
Aljoscha Krettek commented on FLINK-19167:
------------------------------------------
That's correct, I think. But in a real-world scenario that would likely happen.
> Proccess Function Example could not work
> ----------------------------------------
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.11.1
> Reporter: tinny cat
> Priority: Major
>
> Section "*Porccess Function Example*" of
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
> current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2<String, String> value,
> Context ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified +
> 60000);
> }
> @Override
> public void onTimer(
> long timestamp,
> OnTimerContext ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 60000) {
> // emit the state on timeout
> out.collect(new Tuple2<String, Long>(result.key, result.count));
> }
> }
> {code}
> however, it should be:
> {code:java}
> @Override
> public void processElement(
> Tuple2<String, String> value,
> Context ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified +
> 60000);
> }
> @Override
> public void onTimer(
> long timestamp,
> OnTimerContext ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 60000) {
> // emit the state on timeout
> out.collect(new Tuple2<String, Long>(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified =
> ctx.timerService().currentWatermark();` otherwise, `timestamp ==
> result.lastModified + 60000` will be never happend
--
This message was sent by Atlassian Jira
(v8.3.4#803005)