boyuanzz commented on a change in pull request #11492:
URL: https://github.com/apache/beam/pull/11492#discussion_r414734137
##########
File path:
sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
##########
@@ -845,10 +845,12 @@ def process_bundle(self,
(result_future.is_done() and result_future.get().error)):
if isinstance(output, beam_fn_api_pb2.Elements.Timers) and not dry_run:
with BundleManager._lock:
- self.bundle_context_manager.get_buffer(
+ timer_buffer = self.bundle_context_manager.get_buffer(
expected_output_timers[(
output.transform_id, output.timer_family_id)],
- output.transform_id).append(output.timers)
+ output.transform_id)
+ timer_buffer.cleared = False
Review comment:
@pabloem who is the original author of `ListBuffer`. I prefer the second
way Max mentioned above. What do you think? Other than this part, timer changes
look good to me.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]