hililiwei commented on a change in pull request #17106:
URL: https://github.com/apache/flink/pull/17106#discussion_r721937206



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousProcessingTimeTrigger.java
##########
@@ -74,6 +74,11 @@ public TriggerResult onEventTime(long time, W window, 
TriggerContext ctx) throws
     @Override
     public TriggerResult onProcessingTime(long time, W window, TriggerContext 
ctx)
             throws Exception {
+
+        if (time == window.maxTimestamp()) {

Review comment:
       Sorry for the late reply. When processElement is called, it will 
register a CleanupTimer, which will use window.maxTimestamp(). When this time 
is triggered(onProcessingTime(window.maxTimestamp(), window, ctx),  it does not 
fire the window, and the state was cleared 
anyway.(org.apache.flink.streaming.runtime.operators.windowing.WindowOperator#registerCleanupTimer)
   
    I think `long nextFireTimestamp = Math.min(start + interval, 
window.maxTimestamp());` is reasonable, and make window.max window easier to 
understand. But in this way, we may have to modify several places, such as 
ContinuousProcessingTimeTrigger#onElement #onProcessingTime. In addition, 
ContinuousEventTimeTrigger is  also determines whether to fire the window 
through `time == window.maxTimestamp()` at first, modify it together?
   
https://github.com/apache/flink/blob/ac203a7a26d78fe270bd9941bb721be53a893040/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/windowing/triggers/ContinuousEventTimeTrigger.java#L76
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to