Github user aljoscha commented on a diff in the pull request:
https://github.com/apache/flink/pull/5917#discussion_r187865922
--- Diff:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ProcessingTimeTrigger.java
---
@@ -60,7 +60,12 @@ public boolean canMerge() {
@Override
public void onMerge(TimeWindow window,
OnMergeContext ctx) {
- ctx.registerProcessingTimeTimer(window.maxTimestamp());
+ //only current wartermark less than a merge window maxtimestamp
that we regsiter a new timer for fire
+ //otherwise if will fired immediately by call onElement
+ long windowMaxTimestamp = window.maxTimestamp();
+ if (windowMaxTimestamp > ctx.getCurrentWatermark()) {
--- End diff --
this should use `ctx.currentProcessingTime()`. I'll fix while merging.
---