Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/6351#discussion_r204345031
--- Diff:
flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/SequenceGeneratorSource.java
---
@@ -197,6 +210,11 @@ public void
initializeState(FunctionInitializationContext context) throws Except
for (KeyRangeStates keyRange : snapshotKeyRanges.get())
{
keyRanges.add(keyRange);
}
+
+ // let event time start from the max of all event time
progress across subtasks in the last execution
+ for (Long lastEventTime : lastEventTimes.get()) {
+ monotonousEventTime =
Math.max(monotonousEventTime, lastEventTime);
--- End diff --
Yes, I think it should be fine because it is just for the generator. Just
wanted to double check ð
---