saavannanavati commented on a change in pull request #12352:
URL: https://github.com/apache/beam/pull/12352#discussion_r471165054
##########
File path: sdks/python/apache_beam/typehints/typecheck.py
##########
@@ -265,3 +268,89 @@ def visit_transform(self, applied_transform):
transform.get_type_hints(),
applied_transform.full_label),
applied_transform.full_label)
+
+
+class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor):
+
+ _in_combine = False
+ combine_classes = (
+ core.CombineFn,
+ core.CombinePerKey,
+ core.CombineValuesDoFn,
+ core.CombineValues,
+ core.CombineGlobally)
+
+ def enter_composite_transform(self, applied_transform):
+ if isinstance(applied_transform.transform, self.combine_classes):
+ self._in_combine = True
+
+ def leave_composite_transform(self, applied_transform):
+ if isinstance(applied_transform.transform, self.combine_classes):
+ self._in_combine = False
+
+ def visit_transform(self, applied_transform):
+ transform = applied_transform.transform
+ if isinstance(transform, core.ParDo) and not self._in_combine:
+ # Prefix label with 'ParDo' if necessary
+ full_label = applied_transform.full_label
+ if not full_label.startswith('ParDo'):
+ full_label = 'ParDo(%s)' % full_label
+
+ # Store output type hints in current transform
+ transform.fn._runtime_output_constraints = {}
+ output_type_hints = self.get_output_type_hints(transform)
+ if output_type_hints:
+ transform.fn._runtime_output_constraints[full_label] = (
+ output_type_hints)
+
+ # Store input type hints in producer transform
Review comment:
It fails for `Impulse`, `GroupByKey`, `Flatten` and a few other
transforms. Sounds good, added a TODO for now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]