robertwb commented on a change in pull request #12352: URL: https://github.com/apache/beam/pull/12352#discussion_r468090981
########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +269,95 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Store output type hints in current transform + transform.fn._runtime_output_constraints = { + applied_transform.full_label: self.get_output_type_hints(transform) + } + + # Store input type hints in producer transform + producer = applied_transform.inputs[0].producer + if (hasattr(producer, 'transform') and + hasattr(producer.transform, 'fn') and + hasattr(producer.transform.fn, '_runtime_output_constraints')): Review comment: We would want to add this regardless of the dict already being present, right? Perhaps with the above method that'd be simpler. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +269,95 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Store output type hints in current transform + transform.fn._runtime_output_constraints = { + applied_transform.full_label: self.get_output_type_hints(transform) + } + + # Store input type hints in producer transform + producer = applied_transform.inputs[0].producer + if (hasattr(producer, 'transform') and Review comment: Why could this fail? ########## File path: sdks/python/apache_beam/runners/worker/opcounters.py ########## @@ -224,8 +230,25 @@ def _observable_callback_inner(value, is_encoded=False): return _observable_callback_inner + def type_check(self, value): + # type: (any, bool) -> None + for transform_label, type_constraint_tuple in self.output_type_constraints.items(): + parameter_name, constraint = type_constraint_tuple + if constraint is not None: Review comment: Yes, please do that. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +269,95 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Store output type hints in current transform + transform.fn._runtime_output_constraints = { + applied_transform.full_label: self.get_output_type_hints(transform) + } + + # Store input type hints in producer transform + producer = applied_transform.inputs[0].producer + if (hasattr(producer, 'transform') and + hasattr(producer.transform, 'fn') and + hasattr(producer.transform.fn, '_runtime_output_constraints')): + producer = producer.transform.fn + producer._runtime_output_constraints[applied_transform.full_label] \ Review comment: Style: in Beam we avoid backslash line brakes. You can use yapf to break your lines for you. (If needed, add `()`'s)> ########## File path: sdks/python/apache_beam/runners/common.py ########## @@ -1340,6 +1342,17 @@ def process_outputs( self.per_element_output_counter.add_input(0) return + if isinstance(results, (dict, str, unicode, bytes)): + results_type = type(results).__name__ + raise TypeCheckError( + 'Returning a %s from a ParDo or FlatMap is ' + 'discouraged. Please use list("%s") if you really ' + 'want this behavior.' % (results_type, results)) + elif not isinstance(results, collections.Iterable): Review comment: Not yet resolved? ########## File path: sdks/python/apache_beam/runners/worker/opcounters.py ########## @@ -224,8 +230,25 @@ def _observable_callback_inner(value, is_encoded=False): return _observable_callback_inner + def type_check(self, value): + # type: (any, bool) -> None + for transform_label, type_constraint_tuple in self.output_type_constraints.items(): + parameter_name, constraint = type_constraint_tuple + if constraint is not None: + try: + _check_instance_type(constraint, value, parameter_name, verbose=True) + except TypeCheckError as e: + if not transform_label.startswith('ParDo'): Review comment: I don't think we want to mess with this. (Also, if we do, let's push this logic up into the visitor where we decide the label.) ########## File path: sdks/python/apache_beam/runners/worker/operations.py ########## @@ -600,7 +627,8 @@ def _read_side_inputs(self, tags_and_types): self.name_context.step_name, view_options['coder'], i, - suffix='side-input') + suffix='side-input', + producer_type_hints=get_perf_runtime_type_hints(self)) Review comment: This isn't correct, these type hints should not be used for this counter. Make this an optional argument defaulting to None and don't pass it here. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +269,95 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Store output type hints in current transform + transform.fn._runtime_output_constraints = { + applied_transform.full_label: self.get_output_type_hints(transform) + } + + # Store input type hints in producer transform + producer = applied_transform.inputs[0].producer + if (hasattr(producer, 'transform') and + hasattr(producer.transform, 'fn') and Review comment: Cleaner to check isinstance(transform, core.ParDo). Even better would be to add a method like `_add_producer_type_constraint` to PTransform, default implementation empty, overridden in ParDo. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org