saavannanavati commented on a change in pull request #12352: URL: https://github.com/apache/beam/pull/12352#discussion_r468328472
########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +269,95 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Store output type hints in current transform + transform.fn._runtime_output_constraints = { + applied_transform.full_label: self.get_output_type_hints(transform) + } + + # Store input type hints in producer transform + producer = applied_transform.inputs[0].producer + if (hasattr(producer, 'transform') and Review comment: An `Impulse` has no `transform` attribute ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org