chadrik commented on a change in pull request #12881:
URL: https://github.com/apache/beam/pull/12881#discussion_r494687447
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -622,10 +659,12 @@ def process_bundle_progress_metadata_request(self,
request, # type:
beam_fn_api_pb2.ProcessBundleProgressMetadataRequest
instruction_id # type: str
):
+ # type: (...) -> beam_fn_api_pb2.InstructionResponse
return beam_fn_api_pb2.InstructionResponse(
instruction_id=instruction_id,
- process_bundle_progress=beam_fn_api_pb2.
+ process_bundle_progress_metadata=beam_fn_api_pb2.
ProcessBundleProgressMetadataResponse(
+ # FIXME: incompatible type "List[MonitoringInfo]"; expected
"Optional[Mapping[str, Any]]"
monitoring_info=SHORT_ID_CACHE.getInfos(
request.monitoring_info_id)))
Review comment:
Note this change and the FIXME. I'm not sure what to do about this.
I got complaints that `InstructionResponse(process_bundle_progress=...)` was
receiving the wrong type. I'm not sure whether this is the right resolution.
Also, `ProcessBundleProgressMetadataResponse(monitoring_info=...)`is also
receiving the wrong type. It's supposed to be a mapping not a list. Should I
change `SHORT_ID_CACHE.getInfos` to return a dictionary?
Do you know if this code is being hit? If so, any theories on how it's
working?
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -966,6 +976,7 @@ def process_bundle(self, instruction_id):
output_stream = self.timer_data_channel.output_timer_stream(
instruction_id, transform_id, timer_family_id)
timer_info.output_stream = output_stream
+ # FIXME: how do we know that self.ops[transform_id] is a DoOperation?
self.ops[transform_id].add_timer_info(timer_family_id, timer_info)
Review comment:
`BundleProcessor.ops` is `OrderedDict[str, operations.Operation]`.
Should it be `OrderedDict[str, operations.DoOperation]`? Or do the transform
ids in `BundleProcessor.timers_info` all point to `DoOperations`?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]