chadrik commented on a change in pull request #12881:
URL: https://github.com/apache/beam/pull/12881#discussion_r491596716
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1070,23 +1078,24 @@ def delayed_bundle_application(self,
return beam_fn_api_pb2.DelayedBundleApplication(
requested_time_delay=proto_deferred_watermark,
application=self.construct_bundle_application(
- op, current_watermark, element_and_restriction))
+ op.input_info, current_watermark, element_and_restriction))
def bundle_application(self,
op, # type: operations.DoOperation
primary # type: SplitResultPrimary
):
# type: (...) -> beam_fn_api_pb2.BundleApplication
- return self.construct_bundle_application(op, None, primary.primary_value)
+ assert op.input_info is not None
+ return self.construct_bundle_application(
+ op.input_info, None, primary.primary_value)
def construct_bundle_application(self,
- op, # type: operations.DoOperation
+ op_input_info, # type:
operations.OpInputInfo
Review comment:
Instead of passing around a `DoOperation` I switched this to a
`OpInputInfo` because the latter is all that we need, and doing so lets us
avoid having to assert that it is not `None` everywhere that it's used.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]