saavannanavati commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r466192829



##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +268,71 @@ def visit_transform(self, applied_transform):
                 transform.get_type_hints(),
                 applied_transform.full_label),
             applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+  _in_combine = False
+  combine_classes = (
+      core.CombineFn,
+      core.CombinePerKey,
+      core.CombineValuesDoFn,
+      core.CombineValues,
+      core.CombineGlobally)
+
+  def enter_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = True
+
+  def leave_composite_transform(self, applied_transform):
+    if isinstance(applied_transform.transform, self.combine_classes):
+      self._in_combine = False
+
+  def visit_transform(self, applied_transform):
+    transform = applied_transform.transform
+    if isinstance(transform, core.ParDo):
+      if not self._in_combine:
+        transform.fn._full_label = applied_transform.full_label
+        self.store_type_hints(transform)
+
+  def store_type_hints(self, transform):
+    type_hints = transform.get_type_hints()
+
+    input_types = None
+    if type_hints.input_types:
+      normal_hints, kwarg_hints = type_hints.input_types
+
+      if kwarg_hints:
+        input_types = kwarg_hints
+      if normal_hints:
+        input_types = normal_hints
+
+    output_types = None
+    if type_hints.output_types:
+      normal_hints, kwarg_hints = type_hints.output_types
+
+      if kwarg_hints:
+        output_types = kwarg_hints
+      if normal_hints:
+        output_types = normal_hints
+
+    try:
+      argspec = inspect.getfullargspec(transform.fn._process_argspec_fn())
+      if len(argspec.args):
+        arg_index = 0
+        if argspec.args[0] == 'self':
+          arg_index = 1
+        transform.fn._runtime_parameter_name = argspec.args[arg_index]
+        if isinstance(input_types, dict):
+          input_types = (input_types[argspec.args[arg_index]], )
+    except TypeError:
+      pass
+
+    if input_types and len(input_types):
+      input_types = input_types[0]
+
+    if output_types and len(output_types):
+      output_types = output_types[0]
+
+    transform.fn._runtime_type_hints = type_hints._replace(
+        input_types=input_types, output_types=output_types)

Review comment:
       - Through debugging, I've noticed that the type hints are always stored 
in the transforms and not the DoFn's for some reason. Do you know of a case 
where the DoFn stores the type hint?
   
   - Sounds good, moved (most of) the pre-processing computation here




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to