[ 
https://issues.apache.org/jira/browse/BEAM-14244?focusedWorklogId=753393&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-753393
 ]

ASF GitHub Bot logged work on BEAM-14244:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 06/Apr/22 13:32
            Start Date: 06/Apr/22 13:32
    Worklog Time Spent: 10m 
      Work Description: steveniemitz commented on code in PR #17262:
URL: https://github.com/apache/beam/pull/17262#discussion_r843954794


##########
runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/DoFnOperator.java:
##########
@@ -1025,13 +1025,26 @@ protected void fireTimer(TimerData timerData) {
     checkArgument(namespace instanceof WindowNamespace);
     BoundedWindow window = ((WindowNamespace) namespace).getWindow();
     timerInternals.onFiredOrDeletedTimer(timerData);
+    Instant effectiveOutputTimestamp;
+
+    if (timerData.getDomain() == TimeDomain.EVENT_TIME) {
+      effectiveOutputTimestamp = timerData.getOutputTimestamp();
+    } else {
+      // Flink does not set a watermark hold for the timer's output timestamp, 
and previous to
+      // https://github.com/apache/beam/pull/17262 processing time timers did 
not correctly emit
+      // elements at their output timestamp.  In this case we need to continue 
doing the wrong thing
+      // and using the output watermark rather than the firing timestamp.  
Once flink correctly sets
+      // a  watermark hold for the output timestamp, this should be changed 
back.
+      effectiveOutputTimestamp = timerInternals.currentOutputWatermarkTime();

Review Comment:
   should it?  Didn't we say we wanted to use output watermark instead?  I can 
certainly change it to be the input watermark, but I figured I'd make it 
slightly less wrong while I was in here.





Issue Time Tracking
-------------------

    Worklog Id:     (was: 753393)
    Time Spent: 2h 20m  (was: 2h 10m)

> Processing time timers should use outputTimestamp rather than input watermark 
> for their timestamp
> -------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-14244
>                 URL: https://issues.apache.org/jira/browse/BEAM-14244
>             Project: Beam
>          Issue Type: Bug
>          Components: runner-core, sdk-java-core
>            Reporter: Steve Niemitz
>            Assignee: Steve Niemitz
>            Priority: P1
>          Time Spent: 2h 20m
>  Remaining Estimate: 0h
>
> Currently processing time timers ignore the outputTimestamp and instead use 
> the input watermark at the time they fire.  This is wrong because the input 
> watermark can have advanced arbitrarily far past the actual output timestamp 
> when it fires.
> The correct behavior should be to instead use the outputTimestamp the timer 
> was configured to fire with.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to