robertwb commented on a change in pull request #12352: URL: https://github.com/apache/beam/pull/12352#discussion_r470937396
########## File path: sdks/python/apache_beam/examples/snippets/snippets.py ########## @@ -689,8 +689,9 @@ def examples_wordcount_streaming(argv): output = ( lines - | 'DecodeUnicode' >> - beam.FlatMap(lambda encoded: encoded.decode('utf-8')) + | 'DecodeUnicode' >> beam.FlatMap( + lambda encoded: + (encoded if isinstance(encoded, list) else encoded.decode('utf-8'))) Review comment: Why this change? Is encoded actually a list sometimes? (In which case, how did it ever pass?) ########## File path: sdks/python/apache_beam/runners/worker/opcounters.py ########## @@ -224,8 +230,25 @@ def _observable_callback_inner(value, is_encoded=False): return _observable_callback_inner + def type_check(self, value): + # type: (any, bool) -> None + for transform_label, type_constraint_tuple in ( + self.output_type_constraints.items()): + parameter_name, constraint = type_constraint_tuple + try: + _check_instance_type(constraint, value, parameter_name, verbose=True) + except TypeCheckError as e: + if not transform_label.startswith('ParDo'): Review comment: This seems incorrect for Map(...) or Composite/ParDo(...). I don't think we should add a ParDo(...) if it's not already there (in which case we could drop this whole try-catch). ########## File path: sdks/python/apache_beam/transforms/core.py ########## @@ -1465,6 +1465,11 @@ def get_restriction_coder(self): from apache_beam.runners.common import DoFnSignature return DoFnSignature(self.fn).get_restriction_coder() + def _add_type_constraint_from_consumer(self, full_label, input_type_hints): + output_constraints = getattr(self.fn, '_runtime_output_constraints', {}) Review comment: gettattr doesn't add the value if it's not present, you'll have to do ``` if not hasattr(...): self.fn._runtime_output_constraints = {} self.fn._runtime_output_constraints[...] = ... ``` (I think this should be caught by tests, could you be sure you have a test that fails before fixing this?) ########## File path: sdks/python/apache_beam/transforms/ptransform.py ########## @@ -702,6 +702,14 @@ def to_runner_api_pickled(self, unused_context): def runner_api_requires_keyed_input(self): return False + def _add_type_constraint_from_consumer(self, full_label, input_type_hints): + """Adds a consumer transform's input type hints to our output type + constraints, which is used during performance runtime type-checking. + + This is only overridden by ParDo's. Review comment: Likewise, omit this comment about ParDo. ########## File path: sdks/python/apache_beam/transforms/core.py ########## @@ -1465,6 +1465,11 @@ def get_restriction_coder(self): from apache_beam.runners.common import DoFnSignature return DoFnSignature(self.fn).get_restriction_coder() + def _add_type_constraint_from_consumer(self, full_label, input_type_hints): + output_constraints = getattr(self.fn, '_runtime_output_constraints', {}) + output_constraints[full_label] = input_type_hints + self.fn._runtime_output_constraints = output_constraints Review comment: Oh, I see you do it here. I think the logic would be clearer as above. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +268,89 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Prefix label with 'ParDo' if necessary Review comment: Don't add this prefix. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +268,89 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Prefix label with 'ParDo' if necessary + full_label = applied_transform.full_label + if not full_label.startswith('ParDo'): + full_label = 'ParDo(%s)' % full_label + + # Store output type hints in current transform + transform.fn._runtime_output_constraints = {} + output_type_hints = self.get_output_type_hints(transform) + if output_type_hints: + transform.fn._runtime_output_constraints[full_label] = ( + output_type_hints) + + # Store input type hints in producer transform + producer = applied_transform.inputs[0].producer + input_type_hints = self.get_input_type_hints(transform) + if input_type_hints: + producer.transform._add_type_constraint_from_consumer( + full_label, input_type_hints) + + def get_input_type_hints(self, transform): + type_hints = transform.get_type_hints() + + input_types = None + if type_hints.input_types: + normal_hints, kwarg_hints = type_hints.input_types + if kwarg_hints: + input_types = kwarg_hints + if normal_hints: + input_types = normal_hints + + parameter_name = 'Unknown Parameter' + try: Review comment: This is the one bit that might need to be guarded by DoFn. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +268,89 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Prefix label with 'ParDo' if necessary + full_label = applied_transform.full_label + if not full_label.startswith('ParDo'): + full_label = 'ParDo(%s)' % full_label + + # Store output type hints in current transform + transform.fn._runtime_output_constraints = {} + output_type_hints = self.get_output_type_hints(transform) + if output_type_hints: + transform.fn._runtime_output_constraints[full_label] = ( + output_type_hints) + + # Store input type hints in producer transform Review comment: This does not need to be guarded by if `isinstance(transform, core.ParDo)`, we can do it for all transforms. (Actually, the same could be said of the output type hints if we eliminated the direct access to `_runtime_output_constraints` as mentioned above. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +268,89 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + Review comment: Also visit this transform to get its typehints, if any? ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +268,89 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Prefix label with 'ParDo' if necessary + full_label = applied_transform.full_label + if not full_label.startswith('ParDo'): + full_label = 'ParDo(%s)' % full_label + + # Store output type hints in current transform + transform.fn._runtime_output_constraints = {} + output_type_hints = self.get_output_type_hints(transform) + if output_type_hints: + transform.fn._runtime_output_constraints[full_label] = ( + output_type_hints) + + # Store input type hints in producer transform + producer = applied_transform.inputs[0].producer + input_type_hints = self.get_input_type_hints(transform) + if input_type_hints: + producer.transform._add_type_constraint_from_consumer( + full_label, input_type_hints) + + def get_input_type_hints(self, transform): + type_hints = transform.get_type_hints() + + input_types = None + if type_hints.input_types: + normal_hints, kwarg_hints = type_hints.input_types + if kwarg_hints: + input_types = kwarg_hints + if normal_hints: + input_types = normal_hints + + parameter_name = 'Unknown Parameter' + try: + argspec = inspect.getfullargspec(transform.fn._process_argspec_fn()) + if len(argspec.args): + arg_index = 0 + if argspec.args[0] == 'self': + arg_index = 1 + parameter_name = argspec.args[arg_index] + if isinstance(input_types, dict): + input_types = (input_types[argspec.args[arg_index]], ) + except TypeError: Review comment: Why would TypeError be raised? ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +268,89 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Prefix label with 'ParDo' if necessary + full_label = applied_transform.full_label + if not full_label.startswith('ParDo'): + full_label = 'ParDo(%s)' % full_label + + # Store output type hints in current transform + transform.fn._runtime_output_constraints = {} + output_type_hints = self.get_output_type_hints(transform) + if output_type_hints: + transform.fn._runtime_output_constraints[full_label] = ( + output_type_hints) + + # Store input type hints in producer transform Review comment: If this results in test failures due to more (incorrect) type hints being applied, feel free to drop a TODO here for future work. ########## File path: sdks/python/apache_beam/transforms/core.py ########## @@ -1465,6 +1465,11 @@ def get_restriction_coder(self): from apache_beam.runners.common import DoFnSignature return DoFnSignature(self.fn).get_restriction_coder() + def _add_type_constraint_from_consumer(self, full_label, input_type_hints): Review comment: You're recording that the type hint comes from this transform, but are you recording that it comes from the input of this transform? (E.g. if I get a failure about transform B's type hint being violated, it may not be clear if it was its input type or output type that was bad.) ########## File path: sdks/python/apache_beam/runners/worker/operations.py ########## @@ -452,6 +476,14 @@ def str_internal(self, is_recursive=False): return '<%s %s>' % (printable_name, ', '.join(printable_fields)) + def _get_runtime_performance_hints(self): + """Returns any type hints required for performance runtime type-checking. + These type hints are stored in the operation's spec's serialized_fn. Review comment: Omit this line. Different operations could choose to store them in different places (e.g. not all operations even have a serialized_fn). Also, the line about "This is only overridden by DoOperation's." is liable to go out of date (but unlikely to be updated) if we provide this anywhere else, and isn't important to this interface, so can be removed as well. What you might want to document, however, is the type of the return value. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +268,89 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Prefix label with 'ParDo' if necessary + full_label = applied_transform.full_label + if not full_label.startswith('ParDo'): + full_label = 'ParDo(%s)' % full_label + + # Store output type hints in current transform + transform.fn._runtime_output_constraints = {} Review comment: This will overwrite any typehint constraints that may have been set earlier. Instead, use the same method. ########## File path: sdks/python/apache_beam/typehints/typecheck.py ########## @@ -265,3 +268,89 @@ def visit_transform(self, applied_transform): transform.get_type_hints(), applied_transform.full_label), applied_transform.full_label) + + +class PerformanceTypeCheckVisitor(pipeline.PipelineVisitor): + + _in_combine = False + combine_classes = ( + core.CombineFn, + core.CombinePerKey, + core.CombineValuesDoFn, + core.CombineValues, + core.CombineGlobally) + + def enter_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = True + + def leave_composite_transform(self, applied_transform): + if isinstance(applied_transform.transform, self.combine_classes): + self._in_combine = False + + def visit_transform(self, applied_transform): + transform = applied_transform.transform + if isinstance(transform, core.ParDo) and not self._in_combine: + # Prefix label with 'ParDo' if necessary + full_label = applied_transform.full_label + if not full_label.startswith('ParDo'): + full_label = 'ParDo(%s)' % full_label + + # Store output type hints in current transform + transform.fn._runtime_output_constraints = {} + output_type_hints = self.get_output_type_hints(transform) + if output_type_hints: + transform.fn._runtime_output_constraints[full_label] = ( + output_type_hints) + + # Store input type hints in producer transform + producer = applied_transform.inputs[0].producer Review comment: Nit: move this line into the if block. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org