hililiwei commented on a change in pull request #17106:
URL: https://github.com/apache/flink/pull/17106#discussion_r721937206
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
##########
@@ -74,6 +74,11 @@ public TriggerResult onEventTime(long time, W window,
TriggerContext ctx) throws
@Override
public TriggerResult onProcessingTime(long time, W window, TriggerContext
ctx)
throws Exception {
+
+ if (time == window.maxTimestamp()) {
Review comment:
Sorry for the late reply. When processElement is called, it will
register a CleanupTimer, which will use window.maxTimestamp(). When this time
is triggered(onProcessingTime(window.maxTimestamp(), window, ctx), it does not
fire the window, and the state was cleared anyway.
https://github.com/apache/flink/blob/ac203a7a26d78fe270bd9941bb721be53a893040/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/operators/windowing/WindowOperator.java#L621-L633
I think `long nextFireTimestamp = Math.min(start + interval,
window.maxTimestamp());` is reasonable, and make window.max window easier to
understand. But in this way, we may have to modify several places, such as
ContinuousProcessingTimeTrigger#onElement #onProcessingTime. In addition,
ContinuousEventTimeTrigger is also determines whether to fire the window
through `time == window.maxTimestamp()` at first, modify it together?
https://github.com/apache/flink/blob/ac203a7a26d78fe270bd9941bb721be53a893040/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java#L76-L78
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]