Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/4786#discussion_r143502884
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/KeyedProcessOperator.java
---
@@ -79,7 +79,6 @@ public void onEventTime(InternalTimer<K, VoidNamespace>
timer) throws Exception
@Override
public void onProcessingTime(InternalTimer<K, VoidNamespace> timer)
throws Exception {
- collector.setAbsoluteTimestamp(timer.getTimestamp());
--- End diff --
This should call `eraseTimestamp()` because we might still have a timestamp
set from processing some previous elements. Same for the other occurrences in
the code.
---