mxm commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r414504120
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -845,10 +845,12 @@ def process_bundle(self,
(result_future.is_done() and result_future.get().error)):
if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run:
with BundleManager._lock:
- self.bundle_context_manager.get_buffer(
+ timer_buffer = self.bundle_context_manager.get_buffer(
expected_output_timers[(
output.transform_id, output.timer_family_id)],
- output.transform_id).append(output.timers)
+ output.transform_id)
+ timer_buffer.cleared = False
Review comment:
Where can I find this assumption? Note that the tests are passing.
I think it makes sense to reset the flag; when the timer is set after firing
we should treat it independently from any prior settings of the timer. We may
want to leave a comment here why this is necessary. Alternatively, the
resetting may be done directly after the timer is fired.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]