This is an automated email from the ASF dual-hosted git repository. tvalentyn pushed a commit to branch revert-15969-combine-type-hints in repository https://gitbox.apache.org/repos/asf/beam.git
commit d71203a01307008114238a8b4506e67ae24a5ad7 Author: tvalentyn <[email protected]> AuthorDate: Mon Nov 29 15:19:04 2021 -0800 Revert "Key-inferable type hints for CombinePerKey." --- sdks/python/apache_beam/pipeline.py | 4 +--- sdks/python/apache_beam/transforms/core.py | 24 ++++++++++------------ .../apache_beam/transforms/ptransform_test.py | 13 ------------ 3 files changed, 12 insertions(+), 29 deletions(-) diff --git a/sdks/python/apache_beam/pipeline.py b/sdks/python/apache_beam/pipeline.py index a512a18..25c02d4 100644 --- a/sdks/python/apache_beam/pipeline.py +++ b/sdks/python/apache_beam/pipeline.py @@ -892,9 +892,7 @@ class Pipeline(object): pcoll.element_type) if (isinstance(output.element_type, typehints.TupleHint.TupleConstraint) and - len(output.element_type.tuple_types) == 2 and - pcoll.element_type.tuple_types[0] == - output.element_type.tuple_types[0]): + len(output.element_type.tuple_types) == 2): output.requires_deterministic_key_coder = ( deterministic_key_coders and transform_node.full_label) for side_input in transform_node.transform.side_inputs: diff --git a/sdks/python/apache_beam/transforms/core.py b/sdks/python/apache_beam/transforms/core.py index bd999f8..c71c00c 100644 --- a/sdks/python/apache_beam/transforms/core.py +++ b/sdks/python/apache_beam/transforms/core.py @@ -2306,20 +2306,18 @@ class CombinePerKey(PTransformWithSideInputs): self.fn, *args, **kwargs) def default_type_hints(self): - result = self.fn.get_type_hints() - k = typehints.TypeVariable('K') - if result.input_types: - args, kwargs = result.input_types - args = (typehints.Tuple[k, args[0]], ) + args[1:] - result = result.with_input_types(*args, **kwargs) - else: - result = result.with_input_types(typehints.Tuple[k, typehints.Any]) - if result.output_types: - main_output_type = result.simple_output_type('') - result = result.with_output_types(typehints.Tuple[k, main_output_type]) + hints = self.fn.get_type_hints() + if hints.input_types: + K = typehints.TypeVariable('K') + args, kwargs = hints.input_types + args = (typehints.Tuple[K, args[0]], ) + args[1:] + hints = hints.with_input_types(*args, **kwargs) else: - result = result.with_output_types(typehints.Tuple[k, typehints.Any]) - return result + K = typehints.Any + if hints.output_types: + main_output_type = hints.simple_output_type('') + hints = hints.with_output_types(typehints.Tuple[K, main_output_type]) + return hints def to_runner_api_parameter( self, diff --git a/sdks/python/apache_beam/transforms/ptransform_test.py b/sdks/python/apache_beam/transforms/ptransform_test.py index 6a139eb..ac4f632 100644 --- a/sdks/python/apache_beam/transforms/ptransform_test.py +++ b/sdks/python/apache_beam/transforms/ptransform_test.py @@ -1839,19 +1839,6 @@ class PTransformTypeCheckTestCase(TypeHintTestCase): assert_that(d, equal_to([6])) self.p.run() - def test_combine_properly_pipeline_type_checks_without_decorator(self): - def sum_ints(ints): - return sum(ints) - - d = ( - self.p - | beam.Create([1, 2, 3]) - | beam.Map(lambda x: ('key', x)) - | beam.CombinePerKey(sum_ints)) - - self.assertEqual(typehints.Tuple[str, typehints.Any], d.element_type) - self.p.run() - def test_combine_func_type_hint_does_not_take_iterable_using_decorator(self): @with_output_types(int) @with_input_types(a=int)
