pabloem commented on a change in pull request #15441:
URL: https://github.com/apache/beam/pull/15441#discussion_r723713499



##########
File path: 
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -471,9 +559,17 @@ def _collect_written_timers(
             timer_watermark_data[(transform_id, timer_family_id)] = min(
                 timer_watermark_data[(transform_id, timer_family_id)],
                 decoded_timer.hold_timestamp)
-        newly_set_timers[(transform_id, timer_family_id)] = ListBuffer(
-            coder_impl=timer_coder_impl)
-        newly_set_timers[(transform_id, timer_family_id)].append(out.get())
+          else:
+            # Timer was cleared, so we must skip setting it below.
+            timer_cleared = True
+            continue
+        if timer_cleared or (transform_id,

Review comment:
       see in [lines 
546-548](https://github.com/apache/beam/pull/15441/files#diff-4543c398320fcd44da53222a337b5cbe7ed30cfaac8d5509b2a079bd27d6147aR546-R548)
 we decode all the timers that have been written, and we key them by `(key, 
window)` in a dictionary. Note that if there are multiple timers in the same 
`(key, window)`, only the latest one will be saved in the 
`timers_by_key_and_window` dictionary.
   
   Then, in the [loop starting at line 
551](https://github.com/apache/beam/pull/15441/files#diff-4543c398320fcd44da53222a337b5cbe7ed30cfaac8d5509b2a079bd27d6147aR551-R565),
 we read the latest timer action for each `(key, window)`
   
   So we will only apply the latest action - whether it is clear or not.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to