chadrik commented on a change in pull request #12881:
URL: https://github.com/apache/beam/pull/12881#discussion_r491722757
##########
File path: sdks/python/apache_beam/runners/worker/statecache.py
##########
@@ -44,15 +48,17 @@ class Metrics(object):
PREFIX = "beam:metric:statecache:"
def __init__(self):
+ # type: () -> None
self._context = threading.local()
def initialize(self):
+ # type: () -> None
+
"""Needs to be called once per thread to initialize the local metrics
cache.
"""
if hasattr(self._context, 'metrics'):
return # Already initialized
- self._context.metrics = collections.defaultdict(
- int) # type: DefaultDict[Hashable, int]
Review comment:
Removed this annotation because it's invalid: you can't add
annotations to an attribute of another class (non-self attribute). I've
noticed there's definitely a need for a typed version of `threading.local`.
##########
File path: sdks/python/apache_beam/runners/worker/log_handler.py
##########
@@ -156,9 +174,11 @@ def _write_log_entries(self):
done = True
log_entries.pop()
if log_entries:
- yield beam_fn_api_pb2.LogEntry.List(log_entries=log_entries)
+ yield beam_fn_api_pb2.LogEntry.List(
+ log_entries=cast(List[beam_fn_api_pb2.LogEntry], log_entries))
Review comment:
we have to cast here because `log_entries` started out life as a
`List[Union[..., Sentinel]]`, and just above this line we checked for the
sentinel and popped it out, but mypy can't track that.
##########
File path: sdks/python/apache_beam/runners/worker/log_handler.py
##########
@@ -156,9 +174,11 @@ def _write_log_entries(self):
done = True
log_entries.pop()
if log_entries:
- yield beam_fn_api_pb2.LogEntry.List(log_entries=log_entries)
+ yield beam_fn_api_pb2.LogEntry.List(
+ log_entries=cast(List[beam_fn_api_pb2.LogEntry], log_entries))
Review comment:
we have to cast here because `log_entries` was initialized as a
`List[Union[..., Sentinel]]`, and just above this line we checked for the
sentinel and popped it out, but mypy can't track that.
##########
File path: sdks/python/apache_beam/runners/worker/data_plane.py
##########
@@ -303,9 +334,10 @@ def inverse(self):
def input_elements(self,
instruction_id, # type: str
- unused_expected_inputs=None, # type: Collection[str]
+ unused_expected_inputs, # type: Any
Review comment:
It's more accurate to say `Any` here, since the method doesn't care.
The argument can't be `Optional` or the method it would be incompatible with
its super type.
I checked all uses of `input_elements` and I did not see any case where it
was called with only one arg.
##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1619,7 +1633,7 @@ def _create_pardo_operation(
consumers,
output_tags)
if pardo_proto and pardo_proto.restriction_coder_id:
- result.input_info = (
+ result.input_info = operations.OpInputInfo(
Review comment:
Making this a NamedTuple added a bit of sanity, and should be completely
safe.
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -174,11 +191,14 @@ def __init__(self,
self._state_handler_factory = GrpcStateHandlerFactory(
self._state_cache, credentials)
self._profiler_factory = profiler_factory
- self._fns = KeyedDefaultDict(
- lambda id: self._control_stub.GetProcessBundleDescriptor(
- beam_fn_api_pb2.GetProcessBundleDescriptorRequest(
- process_bundle_descriptor_id=id))
- ) # type: Mapping[str, beam_fn_api_pb2.ProcessBundleDescriptor]
+
+ def default_factory(id):
+ # type: (str) -> beam_fn_api_pb2.ProcessBundleDescriptor
+ return self._control_stub.GetProcessBundleDescriptor(
+ beam_fn_api_pb2.GetProcessBundleDescriptorRequest(
+ process_bundle_descriptor_id=id))
+
+ self._fns = KeyedDefaultDict(default_factory)
Review comment:
changing this to a function lets us add annotations which silences a
mypy error. mypy can now detect the key/value types of `KeyedDefaultDict`
##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -464,16 +496,17 @@ class SdkWorker(object):
def __init__(self,
bundle_processor_cache, # type: BundleProcessorCache
- state_cache_metrics_fn=list,
+ state_cache_metrics_fn=list, # type: Callable[[],
Iterable[metrics_pb2.MonitoringInfo]]
profiler_factory=None, # type: Optional[Callable[..., Profile]]
- log_lull_timeout_ns=None,
+ log_lull_timeout_ns=None, # type: Optional[int]
):
+ # type: (...) -> None
self.bundle_processor_cache = bundle_processor_cache
self.state_cache_metrics_fn = state_cache_metrics_fn
self.profiler_factory = profiler_factory
self.log_lull_timeout_ns = (
log_lull_timeout_ns or DEFAULT_LOG_LULL_TIMEOUT_NS)
- self._last_full_thread_dump_secs = 0
+ self._last_full_thread_dump_secs = 0.0
Review comment:
this attribute eventually gets set to the value of `time.time()` which
is a `float`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]