[
https://issues.apache.org/jira/browse/FLINK-19167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17195308#comment-17195308
]
Timo Walther commented on FLINK-19167:
--------------------------------------
[~tinny] the timestamp of {{onTimer}} will be the timestamp with which you
registered the timer. It will not be the watermark. So I also think that the
example is correct. If {{current.lastModified + 60000}} is already covered by
the current watermark when calling {{registerEventTimeTimer}}, the timer method
would fire immediately.
As Aljoscha said before, usually people don't need to think about watermarks in
ProcessFunction because this is rather an internal concept. Users can simply
work with event-time timestamps.
> Proccess Function Example could not work
> ----------------------------------------
>
> Key: FLINK-19167
> URL: https://issues.apache.org/jira/browse/FLINK-19167
> Project: Flink
> Issue Type: Bug
> Components: API / DataStream
> Affects Versions: 1.11.1
> Reporter: tinny cat
> Priority: Major
>
> Section "*Porccess Function Example*" of
> [https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/process_function.html]
> current is:
> {code:java}
> // Some comments here
> @Override
> public void processElement(
> Tuple2<String, String> value,
> Context ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time
> timestamp
> current.lastModified = ctx.timestamp();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified +
> 60000);
> }
> @Override
> public void onTimer(
> long timestamp,
> OnTimerContext ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> // this will be never happened
> if (timestamp == result.lastModified + 60000) {
> // emit the state on timeout
> out.collect(new Tuple2<String, Long>(result.key, result.count));
> }
> }
> {code}
> however, it should be:
> {code:java}
> @Override
> public void processElement(
> Tuple2<String, String> value,
> Context ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> // retrieve the current count
> CountWithTimestamp current = state.value();
> if (current == null) {
> current = new CountWithTimestamp();
> current.key = value.f0;
> }
> // update the state's count
> current.count++;
> // set the state's timestamp to the record's assigned event time
> timestamp
> // it should be the previous watermark
> current.lastModified = ctx.timerService().currentWatermark();
> // write the state back
> state.update(current);
> // schedule the next timer 60 seconds from the current event time
> ctx.timerService().registerEventTimeTimer(current.lastModified +
> 60000);
> }
> @Override
> public void onTimer(
> long timestamp,
> OnTimerContext ctx,
> Collector<Tuple2<String, Long>> out) throws Exception {
> // get the state for the key that scheduled the timer
> CountWithTimestamp result = state.value();
> // check if this is an outdated timer or the latest timer
> if (timestamp == result.lastModified + 60000) {
> // emit the state on timeout
> out.collect(new Tuple2<String, Long>(result.key, result.count));
> }
> }
> {code}
> `current.lastModified = ctx.timestamp();` should be ` current.lastModified =
> ctx.timerService().currentWatermark();` otherwise, `timestamp ==
> result.lastModified + 60000` will be never happend
--
This message was sent by Atlassian Jira
(v8.3.4#803005)