chadrik commented on a change in pull request #12881:
URL: https://github.com/apache/beam/pull/12881#discussion_r491722757



##########
File path: sdks/python/apache_beam/runners/worker/statecache.py
##########
@@ -44,15 +48,17 @@ class Metrics(object):
   PREFIX = "beam:metric:statecache:"
 
   def __init__(self):
+    # type: () -> None
     self._context = threading.local()
 
   def initialize(self):
+    # type: () -> None
+
     """Needs to be called once per thread to initialize the local metrics 
cache.
     """
     if hasattr(self._context, 'metrics'):
       return  # Already initialized
-    self._context.metrics = collections.defaultdict(
-        int)  # type: DefaultDict[Hashable, int]

Review comment:
       Removed this annotation because it's invalid:   you can't add 
annotations to an attribute of another class (non-self attribute).  I've 
noticed there's definitely a need for a typed version of `threading.local`.
   

##########
File path: sdks/python/apache_beam/runners/worker/log_handler.py
##########
@@ -156,9 +174,11 @@ def _write_log_entries(self):
         done = True
         log_entries.pop()
       if log_entries:
-        yield beam_fn_api_pb2.LogEntry.List(log_entries=log_entries)
+        yield beam_fn_api_pb2.LogEntry.List(
+            log_entries=cast(List[beam_fn_api_pb2.LogEntry], log_entries))

Review comment:
       we have to cast here because `log_entries` started out life as a 
`List[Union[..., Sentinel]]`, and just above this line we checked for the 
sentinel and popped it out, but mypy can't track that. 

##########
File path: sdks/python/apache_beam/runners/worker/log_handler.py
##########
@@ -156,9 +174,11 @@ def _write_log_entries(self):
         done = True
         log_entries.pop()
       if log_entries:
-        yield beam_fn_api_pb2.LogEntry.List(log_entries=log_entries)
+        yield beam_fn_api_pb2.LogEntry.List(
+            log_entries=cast(List[beam_fn_api_pb2.LogEntry], log_entries))

Review comment:
       we have to cast here because `log_entries` was initialized as a 
`List[Union[..., Sentinel]]`, and just above this line we checked for the 
sentinel and popped it out, but mypy can't track that. 

##########
File path: sdks/python/apache_beam/runners/worker/data_plane.py
##########
@@ -303,9 +334,10 @@ def inverse(self):
 
   def input_elements(self,
       instruction_id,  # type: str
-      unused_expected_inputs=None,   # type: Collection[str]
+      unused_expected_inputs,   # type: Any

Review comment:
       It's more accurate to say `Any` here, since the method doesn't care.  
The argument can't be `Optional` or the method it would be incompatible with 
its super type. 
   
   I checked all uses of `input_elements` and I did not see any case where it 
was called with only one arg.  
   

##########
File path: sdks/python/apache_beam/runners/worker/bundle_processor.py
##########
@@ -1619,7 +1633,7 @@ def _create_pardo_operation(
       consumers,
       output_tags)
   if pardo_proto and pardo_proto.restriction_coder_id:
-    result.input_info = (
+    result.input_info = operations.OpInputInfo(

Review comment:
       Making this a NamedTuple added a bit of sanity, and should be completely 
safe.

##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -174,11 +191,14 @@ def __init__(self,
     self._state_handler_factory = GrpcStateHandlerFactory(
         self._state_cache, credentials)
     self._profiler_factory = profiler_factory
-    self._fns = KeyedDefaultDict(
-        lambda id: self._control_stub.GetProcessBundleDescriptor(
-            beam_fn_api_pb2.GetProcessBundleDescriptorRequest(
-                process_bundle_descriptor_id=id))
-    )  # type: Mapping[str, beam_fn_api_pb2.ProcessBundleDescriptor]
+
+    def default_factory(id):
+      # type: (str) -> beam_fn_api_pb2.ProcessBundleDescriptor
+      return self._control_stub.GetProcessBundleDescriptor(
+          beam_fn_api_pb2.GetProcessBundleDescriptorRequest(
+              process_bundle_descriptor_id=id))
+
+    self._fns = KeyedDefaultDict(default_factory)

Review comment:
       changing this to a function lets us add annotations which silences a 
mypy error.  mypy can now detect the key/value types of `KeyedDefaultDict`

##########
File path: sdks/python/apache_beam/runners/worker/sdk_worker.py
##########
@@ -464,16 +496,17 @@ class SdkWorker(object):
 
   def __init__(self,
                bundle_processor_cache,  # type: BundleProcessorCache
-               state_cache_metrics_fn=list,
+               state_cache_metrics_fn=list,  # type: Callable[[], 
Iterable[metrics_pb2.MonitoringInfo]]
                profiler_factory=None,  # type: Optional[Callable[..., Profile]]
-               log_lull_timeout_ns=None,
+               log_lull_timeout_ns=None,  # type: Optional[int]
               ):
+    # type: (...) -> None
     self.bundle_processor_cache = bundle_processor_cache
     self.state_cache_metrics_fn = state_cache_metrics_fn
     self.profiler_factory = profiler_factory
     self.log_lull_timeout_ns = (
         log_lull_timeout_ns or DEFAULT_LOG_LULL_TIMEOUT_NS)
-    self._last_full_thread_dump_secs = 0
+    self._last_full_thread_dump_secs = 0.0

Review comment:
       this attribute eventually gets set to the value of `time.time()` which 
is a `float`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to