chadrik commented on a change in pull request #12881:
URL: https://github.com/apache/beam/pull/12881#discussion_r491735659
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -174,11 +191,14 @@ def __init__(self,
self._state_handler_factory = GrpcStateHandlerFactory(
self._state_cache, credentials)
self._profiler_factory = profiler_factory
- self._fns = KeyedDefaultDict(
- lambda id: self._control_stub.GetProcessBundleDescriptor(
- beam_fn_api_pb2.GetProcessBundleDescriptorRequest(
- process_bundle_descriptor_id=id))
- ) # type: Mapping[str, beam_fn_api_pb2.ProcessBundleDescriptor]
+
+ def default_factory(id):
+ # type: (str) -> beam_fn_api_pb2.ProcessBundleDescriptor
+ return self._control_stub.GetProcessBundleDescriptor(
+ beam_fn_api_pb2.GetProcessBundleDescriptorRequest(
+ process_bundle_descriptor_id=id))
+
+ self._fns = KeyedDefaultDict(default_factory)
Review comment:
changing this to a function lets us add annotations which silences a
mypy error. mypy can now detect the key/value types of `KeyedDefaultDict`
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]