This is an automated email from the ASF dual-hosted git repository.
tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new 8926c43 [BEAM-7372] cleanup py2 and py35 codepath from
apache_beam/typehints (#14752)
8926c43 is described below
commit 8926c43f20ab19b8e445d519cf424e881365fc6c
Author: yoshiki.obata <[email protected]>
AuthorDate: Thu May 20 09:03:30 2021 +0900
[BEAM-7372] cleanup py2 and py35 codepath from apache_beam/typehints
(#14752)
---
sdks/python/apache_beam/typehints/__init__.py | 2 -
sdks/python/apache_beam/typehints/decorators.py | 135 +-----
.../apache_beam/typehints/decorators_test.py | 189 +++++++-
.../apache_beam/typehints/decorators_test_py3.py | 215 ---------
.../typehints/native_type_compatibility.py | 9 -
.../typehints/native_type_compatibility_test.py | 3 -
sdks/python/apache_beam/typehints/opcodes.py | 6 +-
sdks/python/apache_beam/typehints/row_type.py | 2 -
sdks/python/apache_beam/typehints/schemas.py | 13 +-
sdks/python/apache_beam/typehints/schemas_test.py | 18 +-
.../apache_beam/typehints/sharded_key_type.py | 8 +-
.../apache_beam/typehints/sharded_key_type_test.py | 2 -
.../apache_beam/typehints/trivial_inference.py | 13 +-
.../typehints/trivial_inference_test.py | 41 +-
.../typehints/trivial_inference_test_py3.py | 54 ---
sdks/python/apache_beam/typehints/typecheck.py | 23 +-
.../{typecheck_test_py3.py => typecheck_test.py} | 2 -
.../apache_beam/typehints/typed_pipeline_test.py | 496 ++++++++++++++++++-
.../typehints/typed_pipeline_test_py3.py | 535 ---------------------
sdks/python/apache_beam/typehints/typehints.py | 9 +-
.../python/apache_beam/typehints/typehints_test.py | 248 +++++++++-
.../apache_beam/typehints/typehints_test_py3.py | 274 -----------
22 files changed, 958 insertions(+), 1339 deletions(-)
diff --git a/sdks/python/apache_beam/typehints/__init__.py
b/sdks/python/apache_beam/typehints/__init__.py
index 23d0b40..e89afa1 100644
--- a/sdks/python/apache_beam/typehints/__init__.py
+++ b/sdks/python/apache_beam/typehints/__init__.py
@@ -17,8 +17,6 @@
"""A package defining the syntax and decorator semantics for type-hints."""
-from __future__ import absolute_import
-
# pylint: disable=wildcard-import
from apache_beam.typehints.typehints import *
from apache_beam.typehints.decorators import *
diff --git a/sdks/python/apache_beam/typehints/decorators.py
b/sdks/python/apache_beam/typehints/decorators.py
index 3dc9878..478869d 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -61,12 +61,6 @@ Example usage for type-hinting both arguments and return
values::
def int_to_str(a):
return str(a)
-Type-hinting a function with arguments that unpack tuples are also supported
-(in Python 2 only). As an example, such a function would be defined as::
-
- def foo((a, b)):
- ...
-
The valid type-hint for such as function looks like the following::
@with_input_types(a=int, b=int)
@@ -85,17 +79,12 @@ defined, or before importing a module containing
type-hinted functions.
# pytype: skip-file
-from __future__ import absolute_import
-
import inspect
import itertools
import logging
import sys
import traceback
import types
-from builtins import next
-from builtins import object
-from builtins import zip
from typing import Any
from typing import Callable
from typing import Dict
@@ -115,11 +104,6 @@ from apache_beam.typehints.typehints import
SimpleTypeHintError
from apache_beam.typehints.typehints import check_constraint
from apache_beam.typehints.typehints import validate_composite_type_param
-try:
- import funcsigs # Python 2 only.
-except ImportError:
- funcsigs = None
-
__all__ = [
'disable_type_annotations',
'no_annotations',
@@ -142,44 +126,6 @@ _ANY_VAR_POSITIONAL = typehints.Tuple[typehints.Any, ...]
_ANY_VAR_KEYWORD = typehints.Dict[typehints.Any, typehints.Any]
_disable_from_callable = False
-try:
- _original_getfullargspec = inspect.getfullargspec
- _use_full_argspec = True
-except AttributeError: # Python 2
- _original_getfullargspec = inspect.getargspec # type: ignore
- _use_full_argspec = False
-
-
-def getfullargspec(func):
- # Python 3: Use get_signature instead.
- assert sys.version_info < (3, ), 'This method should not be used in Python 3'
- try:
- return _original_getfullargspec(func)
- except TypeError:
- if isinstance(func, type):
- argspec = getfullargspec(func.__init__)
- del argspec.args[0]
- return argspec
- elif callable(func):
- try:
- return _original_getfullargspec(func.__call__)
- except TypeError:
- # Return an ArgSpec with at least one positional argument,
- # and any number of other (positional or keyword) arguments
- # whose name won't match any real argument.
- # Arguments with the %unknown% prefix will be ignored in the type
- # checking code.
- if _use_full_argspec:
- return inspect.FullArgSpec(['_'],
- '__unknown__varargs',
- '__unknown__keywords', (), [], {}, {})
- else: # Python 2
- return inspect.ArgSpec(['_'],
- '__unknown__varargs',
- '__unknown__keywords', ())
- else:
- raise
-
def get_signature(func):
"""Like inspect.signature(), but supports Py2 as well.
@@ -188,26 +134,18 @@ def get_signature(func):
latter: 'the "self" parameter is always reported, even for bound methods'
https://github.com/python/cpython/blob/44f91c388a6f4da9ed3300df32ca290b8aa104ea/Lib/inspect.py#L1103
"""
- # Fall back on funcsigs if inspect module doesn't have 'signature'; prefer
- # inspect.signature over funcsigs.signature if both are available.
- if hasattr(inspect, 'signature'):
- inspect_ = inspect
- else:
- inspect_ = funcsigs
-
try:
- signature = inspect_.signature(func)
+ signature = inspect.signature(func)
except ValueError:
# Fall back on a catch-all signature.
params = [
- inspect_.Parameter('_', inspect_.Parameter.POSITIONAL_OR_KEYWORD),
- inspect_.Parameter(
- '__unknown__varargs', inspect_.Parameter.VAR_POSITIONAL),
- inspect_.Parameter(
- '__unknown__keywords', inspect_.Parameter.VAR_KEYWORD)
+ inspect.Parameter('_', inspect.Parameter.POSITIONAL_OR_KEYWORD),
+ inspect.Parameter(
+ '__unknown__varargs', inspect.Parameter.VAR_POSITIONAL),
+ inspect.Parameter('__unknown__keywords', inspect.Parameter.VAR_KEYWORD)
]
- signature = inspect_.Signature(params)
+ signature = inspect.Signature(params)
# This is a specialization to hint the first argument of certain builtins,
# such as str.strip.
@@ -638,65 +576,6 @@ def _unpack_positional_arg_hints(arg, hint):
return hint
-def getcallargs_forhints(func, *typeargs, **typekwargs):
- """Like inspect.getcallargs, with support for declaring default args as Any.
-
- In Python 2, understands that Tuple[] and an Any unpack.
-
- Returns:
- (Dict[str, Any]) A dictionary from arguments names to values.
- """
- if sys.version_info < (3, ):
- return getcallargs_forhints_impl_py2(func, typeargs, typekwargs)
- else:
- return getcallargs_forhints_impl_py3(func, typeargs, typekwargs)
-
-
-def getcallargs_forhints_impl_py2(func, typeargs, typekwargs):
- argspec = getfullargspec(func)
- # Turn Tuple[x, y] into (x, y) so getcallargs can do the proper unpacking.
- packed_typeargs = [
- _unpack_positional_arg_hints(arg, hint)
- for (arg, hint) in zip(argspec.args, typeargs)
- ]
- packed_typeargs += list(typeargs[len(packed_typeargs):])
-
- # Monkeypatch inspect.getfullargspec to allow passing non-function objects.
- # getfullargspec (getargspec on Python 2) are used by inspect.getcallargs.
- # TODO(BEAM-5490): Reimplement getcallargs and stop relying on monkeypatch.
- inspect.getargspec = getfullargspec
- try:
- callargs = inspect.getcallargs(func, *packed_typeargs, **typekwargs) #
pylint: disable=deprecated-method
- except TypeError as e:
- raise TypeCheckError(e)
- finally:
- # Revert monkey-patch.
- inspect.getargspec = _original_getfullargspec
-
- if argspec.defaults:
- # Declare any default arguments to be Any.
- for k, var in enumerate(reversed(argspec.args)):
- if k >= len(argspec.defaults):
- break
- if callargs.get(var, None) is argspec.defaults[-k - 1]:
- callargs[var] = typehints.Any
- # Patch up varargs and keywords
- if argspec.varargs:
- # TODO(BEAM-8122): This will always assign _ANY_VAR_POSITIONAL. Should be
- # "callargs.get(...) or _ANY_VAR_POSITIONAL".
- callargs[argspec.varargs] = typekwargs.get(
- argspec.varargs, _ANY_VAR_POSITIONAL)
-
- varkw = argspec.keywords
- if varkw:
- # TODO(robertwb): Consider taking the union of key and value types.
- callargs[varkw] = typekwargs.get(varkw, _ANY_VAR_KEYWORD)
-
- # TODO(BEAM-5878) Support kwonlyargs.
-
- return callargs
-
-
def _normalize_var_positional_hint(hint):
"""Converts a var_positional hint into Tuple[Union[<types>], ...] form.
@@ -743,7 +622,7 @@ def _normalize_var_keyword_hint(hint, arg_name):
return typehints.Dict[str, typehints.Union[values]]
-def getcallargs_forhints_impl_py3(func, type_args, type_kwargs):
+def getcallargs_forhints(func, *type_args, **type_kwargs):
"""Bind type_args and type_kwargs to func.
Works like inspect.getcallargs, with some modifications to support type hint
diff --git a/sdks/python/apache_beam/typehints/decorators_test.py
b/sdks/python/apache_beam/typehints/decorators_test.py
index 85ca94f..faf0b55 100644
--- a/sdks/python/apache_beam/typehints/decorators_test.py
+++ b/sdks/python/apache_beam/typehints/decorators_test.py
@@ -19,24 +19,31 @@
# pytype: skip-file
-from __future__ import absolute_import
-
-import re
+import functools
import sys
+import typing
import unittest
-import future.tests.base # pylint: disable=unused-import
-
+from apache_beam import Map
from apache_beam.typehints import Any
+from apache_beam.typehints import Dict
from apache_beam.typehints import List
+from apache_beam.typehints import Tuple
+from apache_beam.typehints import TypeCheckError
+from apache_beam.typehints import TypeVariable
from apache_beam.typehints import WithTypeHints
from apache_beam.typehints import decorators
from apache_beam.typehints import typehints
+T = TypeVariable('T')
+# Name is 'T' so it converts to a beam type with the same name.
+# mypy requires that the name of the variable match, so we must ignore this.
+T_typing = typing.TypeVar('T') # type: ignore
+
class IOTypeHintsTest(unittest.TestCase):
def test_get_signature(self):
- # Basic coverage only to make sure function works in Py2 and Py3.
+ # Basic coverage only to make sure function works.
def fn(a, b=1, *c, **d):
return a, b, c, d
@@ -55,7 +62,6 @@ class IOTypeHintsTest(unittest.TestCase):
self.assertEqual(s.return_annotation, List[Any])
def test_from_callable_without_annotations(self):
- # Python 2 doesn't support annotations. See decorators_test_py3.py for
that.
def fn(a, b=None, *args, **kwargs):
return a, b, args, kwargs
@@ -120,8 +126,7 @@ class IOTypeHintsTest(unittest.TestCase):
origin = ''.join(
decorators.IOTypeHints.empty().with_input_types(str).origin)
self.assertRegex(origin, __name__)
- # TODO: use self.assertNotRegex once py2 support is removed.
- self.assertIsNone(re.search(r'\b_make_traceback', origin), msg=origin)
+ self.assertNotRegex(origin, r'\b_make_traceback')
def test_origin(self):
th = decorators.IOTypeHints.empty()
@@ -153,6 +158,140 @@ class IOTypeHintsTest(unittest.TestCase):
th = th.with_defaults(th2)
self.assertNotEqual(expected_id, id(th))
+ def test_from_callable(self):
+ def fn(
+ a: int,
+ b: str = '',
+ *args: Tuple[T],
+ foo: List[int],
+ **kwargs: Dict[str, str]) -> Tuple[Any, ...]:
+ return a, b, args, foo, kwargs
+
+ th = decorators.IOTypeHints.from_callable(fn)
+ self.assertEqual(
+ th.input_types, ((int, str, Tuple[T]), {
+ 'foo': List[int], 'kwargs': Dict[str, str]
+ }))
+ self.assertEqual(th.output_types, ((Tuple[Any, ...], ), {}))
+
+ def test_from_callable_partial_annotations(self):
+ def fn(a: int, b=None, *args, foo: List[int], **kwargs):
+ return a, b, args, foo, kwargs
+
+ th = decorators.IOTypeHints.from_callable(fn)
+ self.assertEqual(
+ th.input_types,
+ ((int, Any, Tuple[Any, ...]), {
+ 'foo': List[int], 'kwargs': Dict[Any, Any]
+ }))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_from_callable_class(self):
+ class Class(object):
+ def __init__(self, unused_arg: int):
+ pass
+
+ th = decorators.IOTypeHints.from_callable(Class)
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((Class, ), {}))
+
+ def test_from_callable_method(self):
+ class Class(object):
+ def method(self, arg: T = None) -> None:
+ pass
+
+ th = decorators.IOTypeHints.from_callable(Class.method)
+ self.assertEqual(th.input_types, ((Any, T), {}))
+ self.assertEqual(th.output_types, ((None, ), {}))
+
+ th = decorators.IOTypeHints.from_callable(Class().method)
+ self.assertEqual(th.input_types, ((T, ), {}))
+ self.assertEqual(th.output_types, ((None, ), {}))
+
+ def test_from_callable_convert_to_beam_types(self):
+ def fn(
+ a: typing.List[int],
+ b: str = '',
+ *args: typing.Tuple[T_typing],
+ foo: typing.List[int],
+ **kwargs: typing.Dict[str, str]) -> typing.Tuple[typing.Any, ...]:
+ return a, b, args, foo, kwargs
+
+ th = decorators.IOTypeHints.from_callable(fn)
+ self.assertEqual(
+ th.input_types,
+ ((List[int], str, Tuple[T]), {
+ 'foo': List[int], 'kwargs': Dict[str, str]
+ }))
+ self.assertEqual(th.output_types, ((Tuple[Any, ...], ), {}))
+
+ def test_from_callable_partial(self):
+ def fn(a: int) -> int:
+ return a
+
+ # functools.partial objects don't have __name__ attributes by default.
+ fn = functools.partial(fn, 1)
+ th = decorators.IOTypeHints.from_callable(fn)
+ self.assertRegex(th.debug_str(), r'unknown')
+
+ def test_getcallargs_forhints(self):
+ def fn(
+ a: int,
+ b: str = '',
+ *args: Tuple[T],
+ foo: List[int],
+ **kwargs: Dict[str, str]) -> Tuple[Any, ...]:
+ return a, b, args, foo, kwargs
+
+ callargs = decorators.getcallargs_forhints(fn, float, foo=List[str])
+ self.assertDictEqual(
+ callargs,
+ {
+ 'a': float,
+ 'b': str,
+ 'args': Tuple[T],
+ 'foo': List[str],
+ 'kwargs': Dict[str, str]
+ })
+
+ def test_getcallargs_forhints_default_arg(self):
+ # Default args are not necessarily types, so they should be ignored.
+ def fn(a=List[int], b=None, *args, foo=(), **kwargs) -> Tuple[Any, ...]:
+ return a, b, args, foo, kwargs
+
+ callargs = decorators.getcallargs_forhints(fn)
+ self.assertDictEqual(
+ callargs,
+ {
+ 'a': Any,
+ 'b': Any,
+ 'args': Tuple[Any, ...],
+ 'foo': Any,
+ 'kwargs': Dict[Any, Any]
+ })
+
+ def test_getcallargs_forhints_missing_arg(self):
+ def fn(a, b=None, *args, foo, **kwargs):
+ return a, b, args, foo, kwargs
+
+ with self.assertRaisesRegex(decorators.TypeCheckError, "missing.*'a'"):
+ decorators.getcallargs_forhints(fn, foo=List[int])
+ with self.assertRaisesRegex(decorators.TypeCheckError, "missing.*'foo'"):
+ decorators.getcallargs_forhints(fn, 5)
+
+ def test_origin(self):
+ def annotated(e: str) -> str:
+ return e
+
+ t = Map(annotated)
+ th = t.get_type_hints()
+ th = th.with_input_types(str)
+ self.assertRegex(th.debug_str(), r'with_input_types')
+ th = th.with_output_types(str)
+ self.assertRegex(
+ th.debug_str(),
+ r'(?s)with_output_types.*with_input_types.*Map.annotated')
+
class WithTypeHintsTest(unittest.TestCase):
def test_get_type_hints_no_settings(self):
@@ -231,5 +370,37 @@ class DecoratorsTest(unittest.TestCase):
self.assertTrue(decorators._disable_from_callable)
+class DecoratorsTest(unittest.TestCase):
+ def test_no_annotations(self):
+ def fn(a: int) -> int:
+ return a
+
+ with self.assertRaisesRegex(TypeCheckError,
+ r'requires .*int.* but got .*str'):
+ _ = ['a', 'b', 'c'] | Map(fn)
+
+ # Same pipeline doesn't raise without annotations on fn.
+ fn = decorators.no_annotations(fn)
+ _ = ['a', 'b', 'c'] | Map(fn)
+
+
+class DecoratorsTest(unittest.TestCase):
+ def test_no_annotations(self):
+ def fn(a: int) -> int:
+ return a
+
+ _ = [1, 2, 3] | Map(fn) # Doesn't raise - correct types.
+
+ with self.assertRaisesRegex(TypeCheckError,
+ r'requires .*int.* but got .*str'):
+ _ = ['a', 'b', 'c'] | Map(fn)
+
+ @decorators.no_annotations
+ def fn2(a: int) -> int:
+ return a
+
+ _ = ['a', 'b', 'c'] | Map(fn2) # Doesn't raise - no input type hints.
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/typehints/decorators_test_py3.py
b/sdks/python/apache_beam/typehints/decorators_test_py3.py
deleted file mode 100644
index 024a0d2..0000000
--- a/sdks/python/apache_beam/typehints/decorators_test_py3.py
+++ /dev/null
@@ -1,215 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests for decorators module with Python 3 syntax not supported by 2.7."""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-
-import functools
-import typing
-import unittest
-
-# patches unittest.TestCase to be python3 compatible
-import future.tests.base # pylint: disable=unused-import
-
-from apache_beam import Map
-from apache_beam.typehints import Any
-from apache_beam.typehints import Dict
-from apache_beam.typehints import List
-from apache_beam.typehints import Tuple
-from apache_beam.typehints import TypeCheckError
-from apache_beam.typehints import TypeVariable
-from apache_beam.typehints import decorators
-
-T = TypeVariable('T')
-# Name is 'T' so it converts to a beam type with the same name.
-# mypy requires that the name of the variable match, so we must ignore this.
-T_typing = typing.TypeVar('T') # type: ignore
-
-
-class IOTypeHintsTest(unittest.TestCase):
- def test_from_callable(self):
- def fn(
- a: int,
- b: str = '',
- *args: Tuple[T],
- foo: List[int],
- **kwargs: Dict[str, str]) -> Tuple[Any, ...]:
- return a, b, args, foo, kwargs
-
- th = decorators.IOTypeHints.from_callable(fn)
- self.assertEqual(
- th.input_types, ((int, str, Tuple[T]), {
- 'foo': List[int], 'kwargs': Dict[str, str]
- }))
- self.assertEqual(th.output_types, ((Tuple[Any, ...], ), {}))
-
- def test_from_callable_partial_annotations(self):
- def fn(a: int, b=None, *args, foo: List[int], **kwargs):
- return a, b, args, foo, kwargs
-
- th = decorators.IOTypeHints.from_callable(fn)
- self.assertEqual(
- th.input_types,
- ((int, Any, Tuple[Any, ...]), {
- 'foo': List[int], 'kwargs': Dict[Any, Any]
- }))
- self.assertEqual(th.output_types, ((Any, ), {}))
-
- def test_from_callable_class(self):
- class Class(object):
- def __init__(self, unused_arg: int):
- pass
-
- th = decorators.IOTypeHints.from_callable(Class)
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((Class, ), {}))
-
- def test_from_callable_method(self):
- class Class(object):
- def method(self, arg: T = None) -> None:
- pass
-
- th = decorators.IOTypeHints.from_callable(Class.method)
- self.assertEqual(th.input_types, ((Any, T), {}))
- self.assertEqual(th.output_types, ((None, ), {}))
-
- th = decorators.IOTypeHints.from_callable(Class().method)
- self.assertEqual(th.input_types, ((T, ), {}))
- self.assertEqual(th.output_types, ((None, ), {}))
-
- def test_from_callable_convert_to_beam_types(self):
- def fn(
- a: typing.List[int],
- b: str = '',
- *args: typing.Tuple[T_typing],
- foo: typing.List[int],
- **kwargs: typing.Dict[str, str]) -> typing.Tuple[typing.Any, ...]:
- return a, b, args, foo, kwargs
-
- th = decorators.IOTypeHints.from_callable(fn)
- self.assertEqual(
- th.input_types,
- ((List[int], str, Tuple[T]), {
- 'foo': List[int], 'kwargs': Dict[str, str]
- }))
- self.assertEqual(th.output_types, ((Tuple[Any, ...], ), {}))
-
- def test_from_callable_partial(self):
- def fn(a: int) -> int:
- return a
-
- # functools.partial objects don't have __name__ attributes by default.
- fn = functools.partial(fn, 1)
- th = decorators.IOTypeHints.from_callable(fn)
- self.assertRegex(th.debug_str(), r'unknown')
-
- def test_getcallargs_forhints(self):
- def fn(
- a: int,
- b: str = '',
- *args: Tuple[T],
- foo: List[int],
- **kwargs: Dict[str, str]) -> Tuple[Any, ...]:
- return a, b, args, foo, kwargs
-
- callargs = decorators.getcallargs_forhints(fn, float, foo=List[str])
- self.assertDictEqual(
- callargs,
- {
- 'a': float,
- 'b': str,
- 'args': Tuple[T],
- 'foo': List[str],
- 'kwargs': Dict[str, str]
- })
-
- def test_getcallargs_forhints_default_arg(self):
- # Default args are not necessarily types, so they should be ignored.
- def fn(a=List[int], b=None, *args, foo=(), **kwargs) -> Tuple[Any, ...]:
- return a, b, args, foo, kwargs
-
- callargs = decorators.getcallargs_forhints(fn)
- self.assertDictEqual(
- callargs,
- {
- 'a': Any,
- 'b': Any,
- 'args': Tuple[Any, ...],
- 'foo': Any,
- 'kwargs': Dict[Any, Any]
- })
-
- def test_getcallargs_forhints_missing_arg(self):
- def fn(a, b=None, *args, foo, **kwargs):
- return a, b, args, foo, kwargs
-
- with self.assertRaisesRegex(decorators.TypeCheckError, "missing.*'a'"):
- decorators.getcallargs_forhints(fn, foo=List[int])
- with self.assertRaisesRegex(decorators.TypeCheckError, "missing.*'foo'"):
- decorators.getcallargs_forhints(fn, 5)
-
- def test_origin(self):
- def annotated(e: str) -> str:
- return e
-
- t = Map(annotated)
- th = t.get_type_hints()
- th = th.with_input_types(str)
- self.assertRegex(th.debug_str(), r'with_input_types')
- th = th.with_output_types(str)
- self.assertRegex(
- th.debug_str(),
- r'(?s)with_output_types.*with_input_types.*Map.annotated')
-
-
-class DecoratorsTest(unittest.TestCase):
- def test_no_annotations(self):
- def fn(a: int) -> int:
- return a
-
- with self.assertRaisesRegex(TypeCheckError,
- r'requires .*int.* but got .*str'):
- _ = ['a', 'b', 'c'] | Map(fn)
-
- # Same pipeline doesn't raise without annotations on fn.
- fn = decorators.no_annotations(fn)
- _ = ['a', 'b', 'c'] | Map(fn)
-
-
-class DecoratorsTest(unittest.TestCase):
- def test_no_annotations(self):
- def fn(a: int) -> int:
- return a
-
- _ = [1, 2, 3] | Map(fn) # Doesn't raise - correct types.
-
- with self.assertRaisesRegex(TypeCheckError,
- r'requires .*int.* but got .*str'):
- _ = ['a', 'b', 'c'] | Map(fn)
-
- @decorators.no_annotations
- def fn2(a: int) -> int:
- return a
-
- _ = ['a', 'b', 'c'] | Map(fn2) # Doesn't raise - no input type hints.
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py
b/sdks/python/apache_beam/typehints/native_type_compatibility.py
index d77727e..3d69cd6 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py
@@ -19,13 +19,10 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import collections
import logging
import sys
import typing
-from builtins import next
from apache_beam.typehints import typehints
@@ -134,12 +131,6 @@ def _match_is_union(user_type):
if user_type is typing.Union:
return True
- try: # Python 3.5.2
- if isinstance(user_type, typing.UnionMeta):
- return True
- except AttributeError:
- pass
-
try: # Python 3.5.4+, or Python 2.7.14+ with typing 3.64
return user_type.__origin__ is typing.Union
except AttributeError:
diff --git
a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
index bb38eef..577a6ee 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
@@ -19,8 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import sys
import typing
import unittest
@@ -70,7 +68,6 @@ class NativeTypeCompatibilityTest(unittest.TestCase):
bytes, typing.Union[int, bytes, float]]]],
typehints.Tuple[bytes, typehints.List[typehints.Tuple[
bytes, typehints.Union[int, bytes, float]]]]),
- # TODO(BEAM-7713): This case seems to fail on Py3.5.2 but not 3.5.4.
('arbitrary-length tuple', typing.Tuple[int, ...],
typehints.Tuple[int, ...]),
('flat alias', _TestFlatAlias, typehints.Tuple[bytes, float]), #
type: ignore[misc]
diff --git a/sdks/python/apache_beam/typehints/opcodes.py
b/sdks/python/apache_beam/typehints/opcodes.py
index 56b2b53..b8aa75f 100644
--- a/sdks/python/apache_beam/typehints/opcodes.py
+++ b/sdks/python/apache_beam/typehints/opcodes.py
@@ -29,16 +29,12 @@ For internal use only; no backwards-compatibility
guarantees.
"""
# pytype: skip-file
-from __future__ import absolute_import
-
import inspect
import logging
import sys
import types
from functools import reduce
-from past.builtins import unicode
-
from apache_beam.typehints import row_type
from apache_beam.typehints import typehints
from apache_beam.typehints.trivial_inference import BoundMethod
@@ -160,7 +156,7 @@ binary_subtract = inplace_subtract = symmetric_binary_op
def binary_subscr(state, unused_arg):
index = state.stack.pop()
base = Const.unwrap(state.stack.pop())
- if base in (str, unicode):
+ if base is str:
out = base
elif (isinstance(index, Const) and isinstance(index.value, int) and
isinstance(base, typehints.IndexableTypeConstraint)):
diff --git a/sdks/python/apache_beam/typehints/row_type.py
b/sdks/python/apache_beam/typehints/row_type.py
index f301aed..f7098fb 100644
--- a/sdks/python/apache_beam/typehints/row_type.py
+++ b/sdks/python/apache_beam/typehints/row_type.py
@@ -17,8 +17,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
from apache_beam.typehints import typehints
diff --git a/sdks/python/apache_beam/typehints/schemas.py
b/sdks/python/apache_beam/typehints/schemas.py
index 5daf68a..eae185d 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -30,7 +30,7 @@ Imposes a mapping between common Python types and Beam
portable schemas
np.float64 <-----> DOUBLE
float ------> DOUBLE
bool <-----> BOOLEAN
- str/unicode <-----> STRING
+ str <-----> STRING
bytes <-----> BYTES
ByteString ------> BYTES
Timestamp <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
@@ -51,8 +51,6 @@ wrapping the type in :code:`Optional`.
# pytype: skip-file
-from __future__ import absolute_import
-
from typing import Any
from typing import ByteString
from typing import Generic
@@ -64,7 +62,6 @@ from typing import TypeVar
from uuid import uuid4
import numpy as np
-from past.builtins import unicode
from apache_beam.portability.api import schema_pb2
from apache_beam.typehints import row_type
@@ -108,7 +105,7 @@ _PRIMITIVES = (
(np.int64, schema_pb2.INT64),
(np.float32, schema_pb2.FLOAT),
(np.float64, schema_pb2.DOUBLE),
- (unicode, schema_pb2.STRING),
+ (str, schema_pb2.STRING),
(bool, schema_pb2.BOOLEAN),
(bytes, schema_pb2.BYTES),
)
@@ -142,7 +139,7 @@ def named_fields_to_schema(names_and_types):
def named_fields_from_schema(
- schema): # (schema_pb2.Schema) -> typing.List[typing.Tuple[unicode, type]]
+ schema): # (schema_pb2.Schema) -> typing.List[typing.Tuple[str, type]]
return [(field.name, typing_from_runner_api(field.type))
for field in schema.fields]
@@ -296,7 +293,7 @@ def schema_from_element_type(element_type): # (type) ->
schema_pb2.Schema
def named_fields_from_element_type(
- element_type): # (type) -> typing.List[typing.Tuple[unicode, type]]
+ element_type): # (type) -> typing.List[typing.Tuple[str, type]]
return named_fields_from_schema(schema_from_element_type(element_type))
@@ -332,7 +329,7 @@ class LogicalType(Generic[LanguageT, RepresentationT,
ArgT]):
@classmethod
def urn(cls):
- # type: () -> unicode
+ # type: () -> str
"""Return the URN used to identify this logical type"""
raise NotImplementedError()
diff --git a/sdks/python/apache_beam/typehints/schemas_test.py
b/sdks/python/apache_beam/typehints/schemas_test.py
index c28f2c5..c662bac 100644
--- a/sdks/python/apache_beam/typehints/schemas_test.py
+++ b/sdks/python/apache_beam/typehints/schemas_test.py
@@ -19,9 +19,8 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import itertools
+import pickle
import unittest
from typing import ByteString
from typing import List
@@ -31,8 +30,6 @@ from typing import Optional
from typing import Sequence
import numpy as np
-from future.moves import pickle
-from past.builtins import unicode
from apache_beam.portability.api import schema_pb2
from apache_beam.typehints.schemas import named_tuple_from_schema
@@ -58,7 +55,6 @@ class SchemaTest(unittest.TestCase):
np.int64,
np.float32,
np.float64,
- unicode,
bool,
bytes,
str,
@@ -85,10 +81,8 @@ class SchemaTest(unittest.TestCase):
'ComplexSchema',
[
('id', np.int64),
- ('name', unicode),
- (
- 'optional_map',
- Optional[Mapping[unicode, Optional[np.float64]]]),
+ ('name', str),
+ ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
('optional_array', Optional[Sequence[np.float32]]),
('array_optional', Sequence[Optional[bool]]),
('timestamp', Timestamp),
@@ -215,9 +209,9 @@ class SchemaTest(unittest.TestCase):
MyCuteClass = NamedTuple(
'MyCuteClass',
[
- ('name', unicode),
+ ('name', str),
('age', Optional[int]),
- ('interests', List[unicode]),
+ ('interests', List[str]),
('height', float),
('blob', ByteString),
])
@@ -273,7 +267,7 @@ class SchemaTest(unittest.TestCase):
def test_user_type_annotated_with_id_after_conversion(self):
MyCuteClass = NamedTuple('MyCuteClass', [
- ('name', unicode),
+ ('name', str),
])
self.assertFalse(hasattr(MyCuteClass, '_beam_schema_id'))
diff --git a/sdks/python/apache_beam/typehints/sharded_key_type.py
b/sdks/python/apache_beam/typehints/sharded_key_type.py
index 72b198d..2461141 100644
--- a/sdks/python/apache_beam/typehints/sharded_key_type.py
+++ b/sdks/python/apache_beam/typehints/sharded_key_type.py
@@ -28,17 +28,13 @@ Mostly for internal use.
# pytype: skip-file
-from __future__ import absolute_import
-
-from six import with_metaclass
-
from apache_beam import coders
from apache_beam.typehints import typehints
from apache_beam.utils.sharded_key import ShardedKey
-class ShardedKeyTypeConstraint(with_metaclass(typehints.GetitemConstructor,
- typehints.TypeConstraint)):
+class ShardedKeyTypeConstraint(typehints.TypeConstraint,
+ metaclass=typehints.GetitemConstructor):
def __init__(self, key_type):
typehints.validate_composite_type_param(
key_type, error_msg_prefix='Parameter to ShardedKeyType hint')
diff --git a/sdks/python/apache_beam/typehints/sharded_key_type_test.py
b/sdks/python/apache_beam/typehints/sharded_key_type_test.py
index dce065b..922fcf9 100644
--- a/sdks/python/apache_beam/typehints/sharded_key_type_test.py
+++ b/sdks/python/apache_beam/typehints/sharded_key_type_test.py
@@ -19,8 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
from apache_beam.typehints import Tuple
from apache_beam.typehints import typehints
from apache_beam.typehints.sharded_key_type import ShardedKeyType
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py
b/sdks/python/apache_beam/typehints/trivial_inference.py
index 2e7f861..cc6534d 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -21,9 +21,7 @@ For internal use only; no backwards-compatibility guarantees.
"""
# pytype: skip-file
-from __future__ import absolute_import
-from __future__ import print_function
-
+import builtins
import collections
import dis
import inspect
@@ -31,8 +29,6 @@ import pprint
import sys
import traceback
import types
-from builtins import object
-from builtins import zip
from functools import reduce
from apache_beam import pvalue
@@ -40,13 +36,6 @@ from apache_beam.typehints import Any
from apache_beam.typehints import row_type
from apache_beam.typehints import typehints
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try: # Python 2
- import __builtin__ as builtins
-except ImportError: # Python 3
- import builtins # type: ignore
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
class TypeInferenceError(ValueError):
"""Error to raise when type inference failed."""
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py
b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 7f54a40..e03e919 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -19,9 +19,6 @@
# pytype: skip-file
-from __future__ import absolute_import
-
-import sys
import types
import unittest
@@ -39,6 +36,21 @@ class TrivialInferenceTest(unittest.TestCase):
expected,
trivial_inference.infer_return_type(f, inputs, debug=True,
depth=depth))
+ def testBuildListUnpack(self):
+ # Lambda uses BUILD_LIST_UNPACK opcode in Python 3.
+ self.assertReturnType(
+ typehints.List[int],
+ lambda _list: [*_list, *_list, *_list], [typehints.List[int]])
+
+ def testBuildTupleUnpack(self):
+ # Lambda uses BUILD_TUPLE_UNPACK opcode in Python 3.
+ # yapf: disable
+ self.assertReturnType(
+ typehints.Tuple[int, str, str],
+ lambda _list1, _list2: (*_list1, *_list2, *_list2),
+ [typehints.List[int], typehints.List[str]])
+ # yapf: enable
+
def testIdentity(self):
self.assertReturnType(int, lambda x: x, [int])
@@ -74,11 +86,9 @@ class TrivialInferenceTest(unittest.TestCase):
self.assertReturnType(str, lambda v: v[::-1], [str])
self.assertReturnType(typehints.Any, lambda v: v[::-1], [typehints.Any])
self.assertReturnType(typehints.Any, lambda v: v[::-1], [object])
- if sys.version_info >= (3, ):
- # Test binary_subscr on a slice of a Const. On Py2.7 this will use the
- # unsupported opcode SLICE+0.
- test_list = ['a', 'b']
- self.assertReturnType(typehints.List[str], lambda: test_list[:], [])
+ # Test binary_subscr on a slice of a Const.
+ test_list = ['a', 'b']
+ self.assertReturnType(typehints.List[str], lambda: test_list[:], [])
def testUnpack(self):
def reverse(a_b):
@@ -104,7 +114,6 @@ class TrivialInferenceTest(unittest.TestCase):
self.assertReturnType(
any_tuple, reverse, [trivial_inference.Const((1, 2, 3))])
- @unittest.skipIf(sys.version_info < (3, ), 'BUILD_MAP better in Python 3')
def testBuildMap(self):
self.assertReturnType(
typehints.Dict[typehints.Any, typehints.Any],
@@ -170,11 +179,7 @@ class TrivialInferenceTest(unittest.TestCase):
self.assertReturnType(
typehints.List[typehints.Union[int, float]],
lambda xs: [x for x in xs], [typehints.Tuple[int, float]])
- if sys.version_info[:2] == (3, 5):
- # A better result requires implementing the MAKE_CLOSURE opcode.
- expected = typehints.Any
- else:
- expected = typehints.List[typehints.Tuple[str, int]]
+ expected = typehints.List[typehints.Tuple[str, int]]
self.assertReturnType(
expected,
lambda kvs: [(kvs[0], v) for v in kvs[1]],
@@ -269,11 +274,7 @@ class TrivialInferenceTest(unittest.TestCase):
def testDictComprehension(self):
fields = []
- if sys.version_info >= (3, 6):
- expected_type = typehints.Dict[typehints.Any, typehints.Any]
- else:
- # For Python 2, just ensure it doesn't crash.
- expected_type = typehints.Any
+ expected_type = typehints.Dict[typehints.Any, typehints.Any]
self.assertReturnType(
expected_type, lambda row: {f: row[f]
for f in fields}, [typehints.Any])
@@ -318,7 +319,6 @@ class TrivialInferenceTest(unittest.TestCase):
x2,
_list: fn(x1, x2, *_list), [str, typehints.List[int]])
- @unittest.skipIf(sys.version_info < (3, 6), 'CALL_FUNCTION_EX is new in 3.6')
def testCallFunctionEx(self):
# Test when fn arguments are built using BUiLD_LIST.
def fn(*args):
@@ -329,7 +329,6 @@ class TrivialInferenceTest(unittest.TestCase):
lambda x1,
x2: fn(*[x1, x2]), [str, float])
- @unittest.skipIf(sys.version_info < (3, 6), 'CALL_FUNCTION_EX is new in 3.6')
def testCallFunctionExKwargs(self):
def fn(x1, x2, **unused_kwargs):
return x1, x2
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test_py3.py
b/sdks/python/apache_beam/typehints/trivial_inference_test_py3.py
deleted file mode 100644
index e52a7ec..0000000
--- a/sdks/python/apache_beam/typehints/trivial_inference_test_py3.py
+++ /dev/null
@@ -1,54 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests for apache_beam.typehints.trivial_inference that use Python 3 syntax.
-"""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-
-import unittest
-
-from apache_beam.typehints import trivial_inference
-from apache_beam.typehints import typehints
-
-
-class TrivialInferenceTest(unittest.TestCase):
- def assertReturnType(self, expected, f, inputs=(), depth=5):
- self.assertEqual(
- expected,
- trivial_inference.infer_return_type(f, inputs, debug=True,
depth=depth))
-
- def testBuildListUnpack(self):
- # Lambda uses BUILD_LIST_UNPACK opcode in Python 3.
- self.assertReturnType(
- typehints.List[int],
- lambda _list: [*_list, *_list, *_list], [typehints.List[int]])
-
- def testBuildTupleUnpack(self):
- # Lambda uses BUILD_TUPLE_UNPACK opcode in Python 3.
- # yapf: disable
- self.assertReturnType(
- typehints.Tuple[int, str, str],
- lambda _list1, _list2: (*_list1, *_list2, *_list2),
- [typehints.List[int], typehints.List[str]])
- # yapf: enable
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/typehints/typecheck.py
b/sdks/python/apache_beam/typehints/typecheck.py
index d44d0de..6c4ba25 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -22,15 +22,11 @@ For internal use only; no backwards-compatibility
guarantees.
# pytype: skip-file
-from __future__ import absolute_import
-
import collections
import inspect
+import sys
import types
-from future.utils import raise_with_traceback
-from past.builtins import unicode
-
from apache_beam import pipeline
from apache_beam.pvalue import TaggedOutput
from apache_beam.transforms import core
@@ -93,7 +89,8 @@ class OutputCheckWrapperDoFn(AbstractDoFnWrapper):
error_msg = (
'Runtime type violation detected within ParDo(%s): '
'%s' % (self.full_label, e))
- raise_with_traceback(TypeCheckError(error_msg))
+ _, _, tb = sys.exc_info()
+ raise TypeCheckError(error_msg).with_traceback(tb)
else:
return self._check_type(result)
@@ -102,7 +99,7 @@ class OutputCheckWrapperDoFn(AbstractDoFnWrapper):
if output is None:
return output
- elif isinstance(output, (dict, bytes, str, unicode)):
+ elif isinstance(output, (dict, bytes, str)):
object_type = type(output).__name__
raise TypeCheckError(
'Returning a %s from a ParDo or FlatMap is '
@@ -184,13 +181,15 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
try:
check_constraint(type_constraint, datum)
except CompositeTypeHintError as e:
- raise_with_traceback(TypeCheckError(e.args[0]))
+ _, _, tb = sys.exc_info()
+ raise TypeCheckError(e.args[0]).with_traceback(tb)
except SimpleTypeHintError:
error_msg = (
"According to type-hint expected %s should be of type %s. "
"Instead, received '%s', an instance of type %s." %
(datum_type, type_constraint, datum, type(datum)))
- raise_with_traceback(TypeCheckError(error_msg))
+ _, _, tb = sys.exc_info()
+ raise TypeCheckError(error_msg).with_traceback(tb)
class TypeCheckCombineFn(core.CombineFn):
@@ -220,7 +219,8 @@ class TypeCheckCombineFn(core.CombineFn):
error_msg = (
'Runtime type violation detected within %s: '
'%s' % (self._label, e))
- raise_with_traceback(TypeCheckError(error_msg))
+ _, _, tb = sys.exc_info()
+ raise TypeCheckError(error_msg).with_traceback(tb)
return self._combinefn.add_input(accumulator, element, *args, **kwargs)
def merge_accumulators(self, accumulators, *args, **kwargs):
@@ -239,7 +239,8 @@ class TypeCheckCombineFn(core.CombineFn):
error_msg = (
'Runtime type violation detected within %s: '
'%s' % (self._label, e))
- raise_with_traceback(TypeCheckError(error_msg))
+ _, _, tb = sys.exc_info()
+ raise TypeCheckError(error_msg).with_traceback(tb)
return result
def teardown(self, *args, **kwargs):
diff --git a/sdks/python/apache_beam/typehints/typecheck_test_py3.py
b/sdks/python/apache_beam/typehints/typecheck_test.py
similarity index 99%
rename from sdks/python/apache_beam/typehints/typecheck_test_py3.py
rename to sdks/python/apache_beam/typehints/typecheck_test.py
index 9b5e487..b6897b5 100644
--- a/sdks/python/apache_beam/typehints/typecheck_test_py3.py
+++ b/sdks/python/apache_beam/typehints/typecheck_test.py
@@ -23,8 +23,6 @@ See additional runtime_type_check=True tests in
ptransform_test.py.
# pytype: skip-file
-from __future__ import absolute_import
-
import os
import tempfile
import unittest
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index d9cea6b..b880e49 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -19,15 +19,10 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import sys
import typing
import unittest
-# patches unittest.TestCase to be python3 compatible
-import future.tests.base # pylint: disable=unused-import
-
import apache_beam as beam
from apache_beam import pvalue
from apache_beam import typehints
@@ -44,6 +39,10 @@ from apache_beam.typehints.decorators import get_signature
class MainInputTest(unittest.TestCase):
+ def assertStartswith(self, msg, prefix):
+ self.assertTrue(
+ msg.startswith(prefix), '"%s" does not start with "%s"' % (msg,
prefix))
+
def test_bad_main_input(self):
@typehints.with_input_types(str, int)
def repeat(s, times):
@@ -66,7 +65,7 @@ class MainInputTest(unittest.TestCase):
self.assertEqual([1, 16, 256], sorted(result))
@unittest.skipIf(
- sys.version_info.major >= 3 and sys.version_info < (3, 7, 0),
+ sys.version_info < (3, 7, 0),
'Function signatures for builtins are not available in Python 3 before '
'version 3.7.')
def test_non_function_fails(self):
@@ -100,18 +99,57 @@ class MainInputTest(unittest.TestCase):
r'requires.*int.*got.*str'):
[1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
+ def test_typed_dofn_method(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, element: int) -> typehints.Tuple[str]:
+ return tuple(str(element))
+
+ result = [1, 2, 3] | beam.ParDo(MyDoFn())
+ self.assertEqual(['1', '2', '3'], sorted(result))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*int.*got.*str'):
+ _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*int.*got.*str'):
+ _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
+
+ def test_typed_dofn_method_with_class_decorators(self):
+ # Class decorators take precedence over PEP 484 hints.
+ @typehints.with_input_types(typehints.Tuple[int, int])
+ @typehints.with_output_types(int)
+ class MyDoFn(beam.DoFn):
+ def process(self, element: int) -> typehints.Tuple[str]:
+ yield element[0]
+
+ result = [(1, 2)] | beam.ParDo(MyDoFn())
+ self.assertEqual([1], sorted(result))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*Tuple\[int, int\].*got.*str'):
+ _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*Tuple\[int, int\].*got.*int'):
+ _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
+
def test_typed_callable_iterable_output(self):
- @typehints.with_input_types(int)
- @typehints.with_output_types(typehints.Iterable[typehints.Iterable[str]])
- def do_fn(element):
+ # Only the outer Iterable should be stripped.
+ def do_fn(element: int) -> typehints.Iterable[typehints.Iterable[str]]:
return [[str(element)] * 2]
result = [1, 2] | beam.ParDo(do_fn)
self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
def test_typed_dofn_instance(self):
+ # Type hints applied to DoFn instance take precedence over decorators and
+ # process annotations.
+ @typehints.with_input_types(typehints.Tuple[int, int])
+ @typehints.with_output_types(int)
class MyDoFn(beam.DoFn):
- def process(self, element):
+ def process(self, element: typehints.Tuple[int, int]) -> \
+ typehints.List[int]:
return [str(element)]
my_do_fn = MyDoFn().with_input_types(int).with_output_types(str)
@@ -119,11 +157,34 @@ class MainInputTest(unittest.TestCase):
result = [1, 2, 3] | beam.ParDo(my_do_fn)
self.assertEqual(['1', '2', '3'], sorted(result))
- with self.assertRaises(typehints.TypeCheckError):
- ['a', 'b', 'c'] | beam.ParDo(my_do_fn)
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*int.*got.*str'):
+ _ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn)
- with self.assertRaises(typehints.TypeCheckError):
- [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*int.*got.*str'):
+ _ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
+
+ def test_typed_callable_instance(self):
+ # Type hints applied to ParDo instance take precedence over callable
+ # decorators and annotations.
+ @typehints.with_input_types(typehints.Tuple[int, int])
+ @typehints.with_output_types(typehints.Generator[int])
+ def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]:
+ yield str(element)
+
+ pardo = beam.ParDo(do_fn).with_input_types(int).with_output_types(str)
+
+ result = [1, 2, 3] | pardo
+ self.assertEqual(['1', '2', '3'], sorted(result))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*int.*got.*str'):
+ _ = ['a', 'b', 'c'] | pardo
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*int.*got.*str'):
+ _ = [1, 2, 3] | (pardo | 'again' >> pardo)
def test_filter_type_hint(self):
@typehints.with_input_types(int)
@@ -264,6 +325,308 @@ class MainInputTest(unittest.TestCase):
'strings': pcoll2, 'integers': pcoll1
} | 'fails' >> multi_input('additional_arg')
+ def test_typed_dofn_method_not_iterable(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, element: int) -> str:
+ return str(element)
+
+ with self.assertRaisesRegex(ValueError, r'str.*is not iterable'):
+ _ = [1, 2, 3] | beam.ParDo(MyDoFn())
+
+ def test_typed_dofn_method_return_none(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, unused_element: int) -> None:
+ pass
+
+ result = [1, 2, 3] | beam.ParDo(MyDoFn())
+ self.assertListEqual([], result)
+
+ def test_typed_dofn_method_return_optional(self):
+ class MyDoFn(beam.DoFn):
+ def process(
+ self,
+ unused_element: int) -> typehints.Optional[typehints.Iterable[int]]:
+ pass
+
+ result = [1, 2, 3] | beam.ParDo(MyDoFn())
+ self.assertListEqual([], result)
+
+ def test_typed_dofn_method_return_optional_not_iterable(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, unused_element: int) -> typehints.Optional[int]:
+ pass
+
+ with self.assertRaisesRegex(ValueError, r'int.*is not iterable'):
+ _ = [1, 2, 3] | beam.ParDo(MyDoFn())
+
+ def test_typed_callable_not_iterable(self):
+ def do_fn(element: int) -> int:
+ return element
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'int.*is not iterable'):
+ _ = [1, 2, 3] | beam.ParDo(do_fn)
+
+ def test_typed_dofn_kwonly(self):
+ class MyDoFn(beam.DoFn):
+ # TODO(BEAM-5878): A kwonly argument like
+ # timestamp=beam.DoFn.TimestampParam would not work here.
+ def process(self, element: int, *, side_input: str) -> \
+ typehints.Generator[typehints.Optional[str]]:
+ yield str(element) if side_input else None
+
+ my_do_fn = MyDoFn()
+
+ result = [1, 2, 3] | beam.ParDo(my_do_fn, side_input='abc')
+ self.assertEqual(['1', '2', '3'], sorted(result))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*str.*got.*int.*side_input'):
+ _ = [1, 2, 3] | beam.ParDo(my_do_fn, side_input=1)
+
+ def test_typed_dofn_var_kwargs(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, element: int, **side_inputs: typehints.Dict[str, str])
\
+ -> typehints.Generator[typehints.Optional[str]]:
+ yield str(element) if side_inputs else None
+
+ my_do_fn = MyDoFn()
+
+ result = [1, 2, 3] | beam.ParDo(my_do_fn, foo='abc', bar='def')
+ self.assertEqual(['1', '2', '3'], sorted(result))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'requires.*str.*got.*int.*side_inputs'):
+ _ = [1, 2, 3] | beam.ParDo(my_do_fn, a=1)
+
+ def test_typed_callable_string_literals(self):
+ def do_fn(element: 'int') -> 'typehints.List[str]':
+ return [[str(element)] * 2]
+
+ result = [1, 2] | beam.ParDo(do_fn)
+ self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
+
+ def test_typed_ptransform_fn(self):
+ # Test that type hints are propagated to the created PTransform.
+ @beam.ptransform_fn
+ @typehints.with_input_types(int)
+ def MyMap(pcoll):
+ def fn(element: int):
+ yield element
+
+ return pcoll | beam.ParDo(fn)
+
+ self.assertListEqual([1, 2, 3], [1, 2, 3] | MyMap())
+ with self.assertRaisesRegex(typehints.TypeCheckError, r'int.*got.*str'):
+ _ = ['a'] | MyMap()
+
+ def test_typed_ptransform_fn_conflicting_hints(self):
+ # In this case, both MyMap and its contained ParDo have separate type
+ # checks (that disagree with each other).
+ @beam.ptransform_fn
+ @typehints.with_input_types(int)
+ def MyMap(pcoll):
+ def fn(element: float):
+ yield element
+
+ return pcoll | beam.ParDo(fn)
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'ParDo.*requires.*float.*got.*int'):
+ _ = [1, 2, 3] | MyMap()
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'MyMap.*expected.*int.*got.*str'):
+ _ = ['a'] | MyMap()
+
+ def test_typed_dofn_string_literals(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, element: 'int') -> 'typehints.List[str]':
+ return [[str(element)] * 2]
+
+ result = [1, 2] | beam.ParDo(MyDoFn())
+ self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
+
+ def test_typed_map(self):
+ def fn(element: int) -> int:
+ return element * 2
+
+ result = [1, 2, 3] | beam.Map(fn)
+ self.assertEqual([2, 4, 6], sorted(result))
+
+ def test_typed_map_return_optional(self):
+ # None is a valid element value for Map.
+ def fn(element: int) -> typehints.Optional[int]:
+ if element > 1:
+ return element
+
+ result = [1, 2, 3] | beam.Map(fn)
+ self.assertCountEqual([None, 2, 3], result)
+
+ def test_typed_flatmap(self):
+ def fn(element: int) -> typehints.Iterable[int]:
+ yield element * 2
+
+ result = [1, 2, 3] | beam.FlatMap(fn)
+ self.assertCountEqual([2, 4, 6], result)
+
+ def test_typed_flatmap_output_hint_not_iterable(self):
+ def fn(element: int) -> int:
+ return element * 2
+
+ # This is raised (originally) in strip_iterable.
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'int.*is not iterable'):
+ _ = [1, 2, 3] | beam.FlatMap(fn)
+
+ def test_typed_flatmap_output_value_not_iterable(self):
+ def fn(element: int) -> typehints.Iterable[int]:
+ return element * 2
+
+ # This is raised in runners/common.py (process_outputs).
+ with self.assertRaisesRegex(TypeError, r'int.*is not iterable'):
+ _ = [1, 2, 3] | beam.FlatMap(fn)
+
+ def test_typed_flatmap_optional(self):
+ def fn(element: int) -> typehints.Optional[typehints.Iterable[int]]:
+ if element > 1:
+ yield element * 2
+
+ # Verify that the output type of fn is int and not Optional[int].
+ def fn2(element: int) -> int:
+ return element
+
+ result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2)
+ self.assertCountEqual([4, 6], result)
+
+ def test_typed_ptransform_with_no_error(self):
+ class StrToInt(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_bad_typehints(self):
+ class StrToInt(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ "Input type hint violation at IntToStr: "
+ "expected <class 'str'>, got <class 'int'>"):
+ _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_bad_input(self):
+ class StrToInt(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ "Input type hint violation at StrToInt: "
+ "expected <class 'str'>, got <class 'int'>"):
+ # Feed integers to a PTransform that expects strings
+ _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_partial_typehints(self):
+ class StrToInt(beam.PTransform):
+ def expand(self, pcoll) -> beam.pvalue.PCollection[int]:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ # Feed integers to a PTransform that should expect strings
+ # but has no typehints so it expects any
+ _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_bare_wrappers(self):
+ class StrToInt(beam.PTransform):
+ def expand(
+ self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_no_typehints(self):
+ class StrToInt(beam.PTransform):
+ def expand(self, pcoll):
+ return pcoll | beam.Map(lambda x: int(x))
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ # Feed integers to a PTransform that should expect strings
+ # but has no typehints so it expects any
+ _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+ def test_typed_ptransform_with_generic_annotations(self):
+ T = typing.TypeVar('T')
+
+ class IntToInt(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[T]:
+ return pcoll | beam.Map(lambda x: x)
+
+ class IntToStr(beam.PTransform):
+ def expand(
+ self,
+ pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[str]:
+ return pcoll | beam.Map(lambda x: str(x))
+
+ _ = [1, 2, 3] | IntToInt() | IntToStr()
+
+ def test_typed_ptransform_with_do_outputs_tuple_compiles(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, element: int, *args, **kwargs):
+ if element % 2:
+ yield beam.pvalue.TaggedOutput('odd', 1)
+ else:
+ yield beam.pvalue.TaggedOutput('even', 1)
+
+ class MyPTransform(beam.PTransform):
+ def expand(self, pcoll: beam.pvalue.PCollection[int]):
+ return pcoll | beam.ParDo(MyDoFn()).with_outputs('odd', 'even')
+
+ # This test fails if you remove the following line from ptransform.py
+ # if isinstance(pvalue_, DoOutputsTuple): continue
+ _ = [1, 2, 3] | MyPTransform()
+
class NativeTypesTest(unittest.TestCase):
def test_good_main_input(self):
@@ -519,6 +882,111 @@ class AnnotationsTest(unittest.TestCase):
self.assertIsNone(th.input_types)
self.assertIsNone(th.output_types)
+ def test_pardo_dofn(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, element: int) -> typehints.Generator[str]:
+ yield str(element)
+
+ th = beam.ParDo(MyDoFn()).get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_pardo_dofn_not_iterable(self):
+ class MyDoFn(beam.DoFn):
+ def process(self, element: int) -> str:
+ return str(element)
+
+ with self.assertRaisesRegex(ValueError, r'str.*is not iterable'):
+ _ = beam.ParDo(MyDoFn()).get_type_hints()
+
+ def test_pardo_wrapper(self):
+ def do_fn(element: int) -> typehints.Iterable[str]:
+ return [str(element)]
+
+ th = beam.ParDo(do_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_pardo_wrapper_tuple(self):
+ # Test case for callables that return key-value pairs for GBK. The outer
+ # Iterable should be stripped but the inner Tuple left intact.
+ def do_fn(element: int) -> typehints.Iterable[typehints.Tuple[str, int]]:
+ return [(str(element), element)]
+
+ th = beam.ParDo(do_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((typehints.Tuple[str, int], ), {}))
+
+ def test_pardo_wrapper_not_iterable(self):
+ def do_fn(element: int) -> str:
+ return str(element)
+
+ with self.assertRaisesRegex(typehints.TypeCheckError,
+ r'str.*is not iterable'):
+ _ = beam.ParDo(do_fn).get_type_hints()
+
+ def test_flat_map_wrapper(self):
+ def map_fn(element: int) -> typehints.Iterable[int]:
+ return [element, element + 1]
+
+ th = beam.FlatMap(map_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((int, ), {}))
+
+ def test_flat_map_wrapper_optional_output(self):
+ # Optional should not affect output type (Nones are ignored).
+ def map_fn(element: int) -> typehints.Optional[typehints.Iterable[int]]:
+ return [element, element + 1]
+
+ th = beam.FlatMap(map_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((int, ), {}))
+
+ @unittest.skip('BEAM-8662: Py3 annotations not yet supported for MapTuple')
+ def test_flat_map_tuple_wrapper(self):
+ # TODO(BEAM-8662): Also test with a fn that accepts default arguments.
+ def tuple_map_fn(a: str, b: str, c: str) -> typehints.Iterable[str]:
+ return [a, b, c]
+
+ th = beam.FlatMapTuple(tuple_map_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((str, str, str), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_map_wrapper(self):
+ def map_fn(unused_element: int) -> int:
+ return 1
+
+ th = beam.Map(map_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((int, ), {}))
+
+ def test_map_wrapper_optional_output(self):
+ # Optional does affect output type (Nones are NOT ignored).
+ def map_fn(unused_element: int) -> typehints.Optional[int]:
+ return 1
+
+ th = beam.Map(map_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((typehints.Optional[int], ), {}))
+
+ @unittest.skip('BEAM-8662: Py3 annotations not yet supported for MapTuple')
+ def test_map_tuple(self):
+ # TODO(BEAM-8662): Also test with a fn that accepts default arguments.
+ def tuple_map_fn(a: str, b: str, c: str) -> str:
+ return a + b + c
+
+ th = beam.MapTuple(tuple_map_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((str, str, str), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_filter_wrapper(self):
+ def filter_fn(element: int) -> bool:
+ return bool(element % 2)
+
+ th = beam.Filter(filter_fn).get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((int, ), {}))
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
b/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
deleted file mode 100644
index 6c45c5c..0000000
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
+++ /dev/null
@@ -1,535 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for type-hint objects and decorators - Python 3 syntax specific.
-"""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-
-import typing
-import unittest
-
-import apache_beam as beam
-from apache_beam import typehints
-
-
-class MainInputTest(unittest.TestCase):
- def assertStartswith(self, msg, prefix):
- self.assertTrue(
- msg.startswith(prefix), '"%s" does not start with "%s"' % (msg,
prefix))
-
- def test_typed_dofn_method(self):
- class MyDoFn(beam.DoFn):
- def process(self, element: int) -> typehints.Tuple[str]:
- return tuple(str(element))
-
- result = [1, 2, 3] | beam.ParDo(MyDoFn())
- self.assertEqual(['1', '2', '3'], sorted(result))
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*int.*got.*str'):
- _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*int.*got.*str'):
- _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
-
- def test_typed_dofn_method_with_class_decorators(self):
- # Class decorators take precedence over PEP 484 hints.
- @typehints.with_input_types(typehints.Tuple[int, int])
- @typehints.with_output_types(int)
- class MyDoFn(beam.DoFn):
- def process(self, element: int) -> typehints.Tuple[str]:
- yield element[0]
-
- result = [(1, 2)] | beam.ParDo(MyDoFn())
- self.assertEqual([1], sorted(result))
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*Tuple\[int, int\].*got.*str'):
- _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*Tuple\[int, int\].*got.*int'):
- _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
-
- def test_typed_dofn_instance(self):
- # Type hints applied to DoFn instance take precedence over decorators and
- # process annotations.
- @typehints.with_input_types(typehints.Tuple[int, int])
- @typehints.with_output_types(int)
- class MyDoFn(beam.DoFn):
- def process(self, element: typehints.Tuple[int, int]) -> \
- typehints.List[int]:
- return [str(element)]
-
- my_do_fn = MyDoFn().with_input_types(int).with_output_types(str)
-
- result = [1, 2, 3] | beam.ParDo(my_do_fn)
- self.assertEqual(['1', '2', '3'], sorted(result))
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*int.*got.*str'):
- _ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn)
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*int.*got.*str'):
- _ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
-
- def test_typed_callable_instance(self):
- # Type hints applied to ParDo instance take precedence over callable
- # decorators and annotations.
- @typehints.with_input_types(typehints.Tuple[int, int])
- @typehints.with_output_types(typehints.Generator[int])
- def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]:
- yield str(element)
-
- pardo = beam.ParDo(do_fn).with_input_types(int).with_output_types(str)
-
- result = [1, 2, 3] | pardo
- self.assertEqual(['1', '2', '3'], sorted(result))
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*int.*got.*str'):
- _ = ['a', 'b', 'c'] | pardo
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*int.*got.*str'):
- _ = [1, 2, 3] | (pardo | 'again' >> pardo)
-
- def test_typed_callable_iterable_output(self):
- # Only the outer Iterable should be stripped.
- def do_fn(element: int) -> typehints.Iterable[typehints.Iterable[str]]:
- return [[str(element)] * 2]
-
- result = [1, 2] | beam.ParDo(do_fn)
- self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
-
- def test_typed_dofn_method_not_iterable(self):
- class MyDoFn(beam.DoFn):
- def process(self, element: int) -> str:
- return str(element)
-
- with self.assertRaisesRegex(ValueError, r'str.*is not iterable'):
- _ = [1, 2, 3] | beam.ParDo(MyDoFn())
-
- def test_typed_dofn_method_return_none(self):
- class MyDoFn(beam.DoFn):
- def process(self, unused_element: int) -> None:
- pass
-
- result = [1, 2, 3] | beam.ParDo(MyDoFn())
- self.assertListEqual([], result)
-
- def test_typed_dofn_method_return_optional(self):
- class MyDoFn(beam.DoFn):
- def process(
- self,
- unused_element: int) -> typehints.Optional[typehints.Iterable[int]]:
- pass
-
- result = [1, 2, 3] | beam.ParDo(MyDoFn())
- self.assertListEqual([], result)
-
- def test_typed_dofn_method_return_optional_not_iterable(self):
- class MyDoFn(beam.DoFn):
- def process(self, unused_element: int) -> typehints.Optional[int]:
- pass
-
- with self.assertRaisesRegex(ValueError, r'int.*is not iterable'):
- _ = [1, 2, 3] | beam.ParDo(MyDoFn())
-
- def test_typed_callable_not_iterable(self):
- def do_fn(element: int) -> int:
- return element
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'int.*is not iterable'):
- _ = [1, 2, 3] | beam.ParDo(do_fn)
-
- def test_typed_dofn_kwonly(self):
- class MyDoFn(beam.DoFn):
- # TODO(BEAM-5878): A kwonly argument like
- # timestamp=beam.DoFn.TimestampParam would not work here.
- def process(self, element: int, *, side_input: str) -> \
- typehints.Generator[typehints.Optional[str]]:
- yield str(element) if side_input else None
-
- my_do_fn = MyDoFn()
-
- result = [1, 2, 3] | beam.ParDo(my_do_fn, side_input='abc')
- self.assertEqual(['1', '2', '3'], sorted(result))
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*str.*got.*int.*side_input'):
- _ = [1, 2, 3] | beam.ParDo(my_do_fn, side_input=1)
-
- def test_typed_dofn_var_kwargs(self):
- class MyDoFn(beam.DoFn):
- def process(self, element: int, **side_inputs: typehints.Dict[str, str])
\
- -> typehints.Generator[typehints.Optional[str]]:
- yield str(element) if side_inputs else None
-
- my_do_fn = MyDoFn()
-
- result = [1, 2, 3] | beam.ParDo(my_do_fn, foo='abc', bar='def')
- self.assertEqual(['1', '2', '3'], sorted(result))
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'requires.*str.*got.*int.*side_inputs'):
- _ = [1, 2, 3] | beam.ParDo(my_do_fn, a=1)
-
- def test_typed_callable_string_literals(self):
- def do_fn(element: 'int') -> 'typehints.List[str]':
- return [[str(element)] * 2]
-
- result = [1, 2] | beam.ParDo(do_fn)
- self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
-
- def test_typed_ptransform_fn(self):
- # Test that type hints are propagated to the created PTransform.
- @beam.ptransform_fn
- @typehints.with_input_types(int)
- def MyMap(pcoll):
- def fn(element: int):
- yield element
-
- return pcoll | beam.ParDo(fn)
-
- self.assertListEqual([1, 2, 3], [1, 2, 3] | MyMap())
- with self.assertRaisesRegex(typehints.TypeCheckError, r'int.*got.*str'):
- _ = ['a'] | MyMap()
-
- def test_typed_ptransform_fn_conflicting_hints(self):
- # In this case, both MyMap and its contained ParDo have separate type
- # checks (that disagree with each other).
- @beam.ptransform_fn
- @typehints.with_input_types(int)
- def MyMap(pcoll):
- def fn(element: float):
- yield element
-
- return pcoll | beam.ParDo(fn)
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'ParDo.*requires.*float.*got.*int'):
- _ = [1, 2, 3] | MyMap()
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'MyMap.*expected.*int.*got.*str'):
- _ = ['a'] | MyMap()
-
- def test_typed_dofn_string_literals(self):
- class MyDoFn(beam.DoFn):
- def process(self, element: 'int') -> 'typehints.List[str]':
- return [[str(element)] * 2]
-
- result = [1, 2] | beam.ParDo(MyDoFn())
- self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
-
- def test_typed_map(self):
- def fn(element: int) -> int:
- return element * 2
-
- result = [1, 2, 3] | beam.Map(fn)
- self.assertEqual([2, 4, 6], sorted(result))
-
- def test_typed_map_return_optional(self):
- # None is a valid element value for Map.
- def fn(element: int) -> typehints.Optional[int]:
- if element > 1:
- return element
-
- result = [1, 2, 3] | beam.Map(fn)
- self.assertCountEqual([None, 2, 3], result)
-
- def test_typed_flatmap(self):
- def fn(element: int) -> typehints.Iterable[int]:
- yield element * 2
-
- result = [1, 2, 3] | beam.FlatMap(fn)
- self.assertCountEqual([2, 4, 6], result)
-
- def test_typed_flatmap_output_hint_not_iterable(self):
- def fn(element: int) -> int:
- return element * 2
-
- # This is raised (originally) in strip_iterable.
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'int.*is not iterable'):
- _ = [1, 2, 3] | beam.FlatMap(fn)
-
- def test_typed_flatmap_output_value_not_iterable(self):
- def fn(element: int) -> typehints.Iterable[int]:
- return element * 2
-
- # This is raised in runners/common.py (process_outputs).
- with self.assertRaisesRegex(TypeError, r'int.*is not iterable'):
- _ = [1, 2, 3] | beam.FlatMap(fn)
-
- def test_typed_flatmap_optional(self):
- def fn(element: int) -> typehints.Optional[typehints.Iterable[int]]:
- if element > 1:
- yield element * 2
-
- # Verify that the output type of fn is int and not Optional[int].
- def fn2(element: int) -> int:
- return element
-
- result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2)
- self.assertCountEqual([4, 6], result)
-
- def test_typed_ptransform_with_no_error(self):
- class StrToInt(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
- return pcoll | beam.Map(lambda x: int(x))
-
- class IntToStr(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
- return pcoll | beam.Map(lambda x: str(x))
-
- _ = ['1', '2', '3'] | StrToInt() | IntToStr()
-
- def test_typed_ptransform_with_bad_typehints(self):
- class StrToInt(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
- return pcoll | beam.Map(lambda x: int(x))
-
- class IntToStr(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[str]:
- return pcoll | beam.Map(lambda x: str(x))
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- "Input type hint violation at IntToStr: "
- "expected <class 'str'>, got <class 'int'>"):
- _ = ['1', '2', '3'] | StrToInt() | IntToStr()
-
- def test_typed_ptransform_with_bad_input(self):
- class StrToInt(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
- return pcoll | beam.Map(lambda x: int(x))
-
- class IntToStr(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
- return pcoll | beam.Map(lambda x: str(x))
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- "Input type hint violation at StrToInt: "
- "expected <class 'str'>, got <class 'int'>"):
- # Feed integers to a PTransform that expects strings
- _ = [1, 2, 3] | StrToInt() | IntToStr()
-
- def test_typed_ptransform_with_partial_typehints(self):
- class StrToInt(beam.PTransform):
- def expand(self, pcoll) -> beam.pvalue.PCollection[int]:
- return pcoll | beam.Map(lambda x: int(x))
-
- class IntToStr(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
- return pcoll | beam.Map(lambda x: str(x))
-
- # Feed integers to a PTransform that should expect strings
- # but has no typehints so it expects any
- _ = [1, 2, 3] | StrToInt() | IntToStr()
-
- def test_typed_ptransform_with_bare_wrappers(self):
- class StrToInt(beam.PTransform):
- def expand(
- self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
- return pcoll | beam.Map(lambda x: int(x))
-
- class IntToStr(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
- return pcoll | beam.Map(lambda x: str(x))
-
- _ = [1, 2, 3] | StrToInt() | IntToStr()
-
- def test_typed_ptransform_with_no_typehints(self):
- class StrToInt(beam.PTransform):
- def expand(self, pcoll):
- return pcoll | beam.Map(lambda x: int(x))
-
- class IntToStr(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
- return pcoll | beam.Map(lambda x: str(x))
-
- # Feed integers to a PTransform that should expect strings
- # but has no typehints so it expects any
- _ = [1, 2, 3] | StrToInt() | IntToStr()
-
- def test_typed_ptransform_with_generic_annotations(self):
- T = typing.TypeVar('T')
-
- class IntToInt(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[T]:
- return pcoll | beam.Map(lambda x: x)
-
- class IntToStr(beam.PTransform):
- def expand(
- self,
- pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[str]:
- return pcoll | beam.Map(lambda x: str(x))
-
- _ = [1, 2, 3] | IntToInt() | IntToStr()
-
- def test_typed_ptransform_with_do_outputs_tuple_compiles(self):
- class MyDoFn(beam.DoFn):
- def process(self, element: int, *args, **kwargs):
- if element % 2:
- yield beam.pvalue.TaggedOutput('odd', 1)
- else:
- yield beam.pvalue.TaggedOutput('even', 1)
-
- class MyPTransform(beam.PTransform):
- def expand(self, pcoll: beam.pvalue.PCollection[int]):
- return pcoll | beam.ParDo(MyDoFn()).with_outputs('odd', 'even')
-
- # This test fails if you remove the following line from ptransform.py
- # if isinstance(pvalue_, DoOutputsTuple): continue
- _ = [1, 2, 3] | MyPTransform()
-
-
-class AnnotationsTest(unittest.TestCase):
- def test_pardo_dofn(self):
- class MyDoFn(beam.DoFn):
- def process(self, element: int) -> typehints.Generator[str]:
- yield str(element)
-
- th = beam.ParDo(MyDoFn()).get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((str, ), {}))
-
- def test_pardo_dofn_not_iterable(self):
- class MyDoFn(beam.DoFn):
- def process(self, element: int) -> str:
- return str(element)
-
- with self.assertRaisesRegex(ValueError, r'str.*is not iterable'):
- _ = beam.ParDo(MyDoFn()).get_type_hints()
-
- def test_pardo_wrapper(self):
- def do_fn(element: int) -> typehints.Iterable[str]:
- return [str(element)]
-
- th = beam.ParDo(do_fn).get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((str, ), {}))
-
- def test_pardo_wrapper_tuple(self):
- # Test case for callables that return key-value pairs for GBK. The outer
- # Iterable should be stripped but the inner Tuple left intact.
- def do_fn(element: int) -> typehints.Iterable[typehints.Tuple[str, int]]:
- return [(str(element), element)]
-
- th = beam.ParDo(do_fn).get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((typehints.Tuple[str, int], ), {}))
-
- def test_pardo_wrapper_not_iterable(self):
- def do_fn(element: int) -> str:
- return str(element)
-
- with self.assertRaisesRegex(typehints.TypeCheckError,
- r'str.*is not iterable'):
- _ = beam.ParDo(do_fn).get_type_hints()
-
- def test_flat_map_wrapper(self):
- def map_fn(element: int) -> typehints.Iterable[int]:
- return [element, element + 1]
-
- th = beam.FlatMap(map_fn).get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((int, ), {}))
-
- def test_flat_map_wrapper_optional_output(self):
- # Optional should not affect output type (Nones are ignored).
- def map_fn(element: int) -> typehints.Optional[typehints.Iterable[int]]:
- return [element, element + 1]
-
- th = beam.FlatMap(map_fn).get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((int, ), {}))
-
- @unittest.skip('BEAM-8662: Py3 annotations not yet supported for MapTuple')
- def test_flat_map_tuple_wrapper(self):
- # TODO(BEAM-8662): Also test with a fn that accepts default arguments.
- def tuple_map_fn(a: str, b: str, c: str) -> typehints.Iterable[str]:
- return [a, b, c]
-
- th = beam.FlatMapTuple(tuple_map_fn).get_type_hints()
- self.assertEqual(th.input_types, ((str, str, str), {}))
- self.assertEqual(th.output_types, ((str, ), {}))
-
- def test_map_wrapper(self):
- def map_fn(unused_element: int) -> int:
- return 1
-
- th = beam.Map(map_fn).get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((int, ), {}))
-
- def test_map_wrapper_optional_output(self):
- # Optional does affect output type (Nones are NOT ignored).
- def map_fn(unused_element: int) -> typehints.Optional[int]:
- return 1
-
- th = beam.Map(map_fn).get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((typehints.Optional[int], ), {}))
-
- @unittest.skip('BEAM-8662: Py3 annotations not yet supported for MapTuple')
- def test_map_tuple(self):
- # TODO(BEAM-8662): Also test with a fn that accepts default arguments.
- def tuple_map_fn(a: str, b: str, c: str) -> str:
- return a + b + c
-
- th = beam.MapTuple(tuple_map_fn).get_type_hints()
- self.assertEqual(th.input_types, ((str, str, str), {}))
- self.assertEqual(th.output_types, ((str, ), {}))
-
- def test_filter_wrapper(self):
- def filter_fn(element: int) -> bool:
- return bool(element % 2)
-
- th = beam.Filter(filter_fn).get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((int, ), {}))
-
-
-if __name__ == '__main__':
- unittest.main()
diff --git a/sdks/python/apache_beam/typehints/typehints.py
b/sdks/python/apache_beam/typehints/typehints.py
index 615a847..3ff8224 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -65,16 +65,10 @@ In addition, type-hints can be used to implement run-time
type-checking via the
# pytype: skip-file
-from __future__ import absolute_import
-
import collections
import copy
import logging
import typing
-from builtins import next
-from builtins import zip
-
-from future.utils import with_metaclass
__all__ = [
'Any',
@@ -1038,8 +1032,7 @@ class IteratorHint(CompositeTypeHint):
IteratorTypeConstraint = IteratorHint.IteratorTypeConstraint
-class WindowedTypeConstraint(with_metaclass(GetitemConstructor, TypeConstraint)
- ): # type: ignore[misc]
+class WindowedTypeConstraint(TypeConstraint, metaclass=GetitemConstructor):
"""A type constraint for WindowedValue objects.
Mostly for internal use.
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py
b/sdks/python/apache_beam/typehints/typehints_test.py
index 4ff3040..a469175 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -19,19 +19,21 @@
# pytype: skip-file
-from __future__ import absolute_import
-
import functools
import sys
+import typing
import unittest
-from builtins import next
-from builtins import range
-
-# patches unittest.TestCase to be python3 compatible
-import future.tests.base # pylint: disable=unused-import
import apache_beam.typehints.typehints as typehints
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam.pvalue import PBegin
+from apache_beam.pvalue import PCollection
+from apache_beam.pvalue import PDone
+from apache_beam.transforms.core import DoFn
+from apache_beam.typehints import KV
from apache_beam.typehints import Any
+from apache_beam.typehints import Iterable
from apache_beam.typehints import Tuple
from apache_beam.typehints import TypeCheckError
from apache_beam.typehints import Union
@@ -1283,5 +1285,237 @@ class TestCoerceToKvType(TypeHintTestCase):
typehints.coerce_to_kv_type(*args)
+class TestParDoAnnotations(unittest.TestCase):
+ def test_with_side_input(self):
+ class MyDoFn(DoFn):
+ def process(self, element: float, side_input: str) -> \
+ Iterable[KV[str, float]]:
+ pass
+
+ th = MyDoFn().get_type_hints()
+ self.assertEqual(th.input_types, ((float, str), {}))
+ self.assertEqual(th.output_types, ((KV[str, float], ), {}))
+
+ def test_pep484_annotations(self):
+ class MyDoFn(DoFn):
+ def process(self, element: int) -> Iterable[str]:
+ pass
+
+ th = MyDoFn().get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+
+class TestPTransformAnnotations(unittest.TestCase):
+ def test_pep484_annotations(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_annotations_without_input_pcollection_wrapper(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: int) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ error_str = (
+ r'This input type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, input type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, or PBegin. Got: {} instead.'.format(int))
+
+ with self.assertLogs(level='WARN') as log:
+ MyPTransform().get_type_hints()
+ self.assertIn(error_str, log.output[0])
+
+ def test_annotations_without_output_pcollection_wrapper(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection[int]) -> str:
+ return pcoll | Map(lambda num: str(num))
+
+ error_str = (
+ r'This output type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, output type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, PDone, or None. Got: {} instead.'.format(str))
+
+ with self.assertLogs(level='WARN') as log:
+ th = MyPTransform().get_type_hints()
+ self.assertIn(error_str, log.output[0])
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, None)
+
+ def test_annotations_without_input_internal_type(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_annotations_without_output_internal_type(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection[int]) -> PCollection:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_without_any_internal_type(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection) -> PCollection:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_without_input_typehint(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_annotations_without_output_typehint(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PCollection[int]):
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((int, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_without_any_typehints(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll):
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, None)
+ self.assertEqual(th.output_types, None)
+
+ def test_annotations_with_pbegin(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: PBegin):
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_with_pdone(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll) -> PDone:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_with_none_input(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: None) -> PCollection[str]:
+ return pcoll | Map(lambda num: str(num))
+
+ error_str = (
+ r'This input type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, input type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, or PBegin. Got: {} instead.'.format(None))
+
+ with self.assertLogs(level='WARN') as log:
+ th = MyPTransform().get_type_hints()
+ self.assertIn(error_str, log.output[0])
+ self.assertEqual(th.input_types, None)
+ self.assertEqual(th.output_types, ((str, ), {}))
+
+ def test_annotations_with_none_output(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll) -> None:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, ((Any, ), {}))
+
+ def test_annotations_with_arbitrary_output(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll) -> str:
+ return pcoll | Map(lambda num: str(num))
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((Any, ), {}))
+ self.assertEqual(th.output_types, None)
+
+ def test_annotations_with_arbitrary_input_and_output(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: int) -> str:
+ return pcoll | Map(lambda num: str(num))
+
+ input_error_str = (
+ r'This input type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, input type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, or PBegin. Got: {} instead.'.format(int))
+
+ output_error_str = (
+ r'This output type hint will be ignored and not used for '
+ r'type-checking purposes. Typically, output type hints for a '
+ r'PTransform are single (or nested) types wrapped by a '
+ r'PCollection, PDone, or None. Got: {} instead.'.format(str))
+
+ with self.assertLogs(level='WARN') as log:
+ th = MyPTransform().get_type_hints()
+ self.assertIn(input_error_str, log.output[0])
+ self.assertIn(output_error_str, log.output[1])
+ self.assertEqual(th.input_types, None)
+ self.assertEqual(th.output_types, None)
+
+ def test_typing_module_annotations_are_converted_to_beam_annotations(self):
+ class MyPTransform(PTransform):
+ def expand(
+ self, pcoll: PCollection[typing.Dict[str, str]]
+ ) -> PCollection[typing.Dict[str, str]]:
+ return pcoll
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
+ self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
+
+ def test_nested_typing_annotations_are_converted_to_beam_annotations(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll:
+ PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]) \
+ -> PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]:
+ return pcoll
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(
+ th.input_types,
+ ((typehints.Union[int, typehints.Any, typehints.Dict[str,
+ float]], ), {}))
+ self.assertEqual(
+ th.input_types,
+ ((typehints.Union[int, typehints.Any, typehints.Dict[str,
+ float]], ), {}))
+
+ def test_mixed_annotations_are_converted_to_beam_annotations(self):
+ class MyPTransform(PTransform):
+ def expand(self, pcoll: typing.Any) -> typehints.Any:
+ return pcoll
+
+ th = MyPTransform().get_type_hints()
+ self.assertEqual(th.input_types, ((typehints.Any, ), {}))
+ self.assertEqual(th.input_types, ((typehints.Any, ), {}))
+
+
if __name__ == '__main__':
unittest.main()
diff --git a/sdks/python/apache_beam/typehints/typehints_test_py3.py
b/sdks/python/apache_beam/typehints/typehints_test_py3.py
deleted file mode 100644
index 5a36330..0000000
--- a/sdks/python/apache_beam/typehints/typehints_test_py3.py
+++ /dev/null
@@ -1,274 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the type-hint objects and decorators with Python 3 syntax not
-supported by 2.7."""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import typing
-import unittest
-
-import apache_beam.typehints.typehints as typehints
-from apache_beam import Map
-from apache_beam import PTransform
-from apache_beam.pvalue import PBegin
-from apache_beam.pvalue import PCollection
-from apache_beam.pvalue import PDone
-from apache_beam.transforms.core import DoFn
-from apache_beam.typehints import KV
-from apache_beam.typehints import Iterable
-from apache_beam.typehints.typehints import Any
-
-
-class TestParDoAnnotations(unittest.TestCase):
- def test_with_side_input(self):
- class MyDoFn(DoFn):
- def process(self, element: float, side_input: str) -> \
- Iterable[KV[str, float]]:
- pass
-
- th = MyDoFn().get_type_hints()
- self.assertEqual(th.input_types, ((float, str), {}))
- self.assertEqual(th.output_types, ((KV[str, float], ), {}))
-
- def test_pep484_annotations(self):
- class MyDoFn(DoFn):
- def process(self, element: int) -> Iterable[str]:
- pass
-
- th = MyDoFn().get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((str, ), {}))
-
-
-class TestPTransformAnnotations(unittest.TestCase):
- def test_pep484_annotations(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((str, ), {}))
-
- def test_annotations_without_input_pcollection_wrapper(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: int) -> PCollection[str]:
- return pcoll | Map(lambda num: str(num))
-
- error_str = (
- r'This input type hint will be ignored and not used for '
- r'type-checking purposes. Typically, input type hints for a '
- r'PTransform are single (or nested) types wrapped by a '
- r'PCollection, or PBegin. Got: {} instead.'.format(int))
-
- with self.assertLogs(level='WARN') as log:
- MyPTransform().get_type_hints()
- self.assertIn(error_str, log.output[0])
-
- def test_annotations_without_output_pcollection_wrapper(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: PCollection[int]) -> str:
- return pcoll | Map(lambda num: str(num))
-
- error_str = (
- r'This output type hint will be ignored and not used for '
- r'type-checking purposes. Typically, output type hints for a '
- r'PTransform are single (or nested) types wrapped by a '
- r'PCollection, PDone, or None. Got: {} instead.'.format(str))
-
- with self.assertLogs(level='WARN') as log:
- th = MyPTransform().get_type_hints()
- self.assertIn(error_str, log.output[0])
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, None)
-
- def test_annotations_without_input_internal_type(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: PCollection) -> PCollection[str]:
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((Any, ), {}))
- self.assertEqual(th.output_types, ((str, ), {}))
-
- def test_annotations_without_output_internal_type(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: PCollection[int]) -> PCollection:
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((Any, ), {}))
-
- def test_annotations_without_any_internal_type(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: PCollection) -> PCollection:
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((Any, ), {}))
- self.assertEqual(th.output_types, ((Any, ), {}))
-
- def test_annotations_without_input_typehint(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll) -> PCollection[str]:
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((Any, ), {}))
- self.assertEqual(th.output_types, ((str, ), {}))
-
- def test_annotations_without_output_typehint(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: PCollection[int]):
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((int, ), {}))
- self.assertEqual(th.output_types, ((Any, ), {}))
-
- def test_annotations_without_any_typehints(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll):
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, None)
- self.assertEqual(th.output_types, None)
-
- def test_annotations_with_pbegin(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: PBegin):
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((Any, ), {}))
- self.assertEqual(th.output_types, ((Any, ), {}))
-
- def test_annotations_with_pdone(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll) -> PDone:
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((Any, ), {}))
- self.assertEqual(th.output_types, ((Any, ), {}))
-
- def test_annotations_with_none_input(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: None) -> PCollection[str]:
- return pcoll | Map(lambda num: str(num))
-
- error_str = (
- r'This input type hint will be ignored and not used for '
- r'type-checking purposes. Typically, input type hints for a '
- r'PTransform are single (or nested) types wrapped by a '
- r'PCollection, or PBegin. Got: {} instead.'.format(None))
-
- with self.assertLogs(level='WARN') as log:
- th = MyPTransform().get_type_hints()
- self.assertIn(error_str, log.output[0])
- self.assertEqual(th.input_types, None)
- self.assertEqual(th.output_types, ((str, ), {}))
-
- def test_annotations_with_none_output(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll) -> None:
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((Any, ), {}))
- self.assertEqual(th.output_types, ((Any, ), {}))
-
- def test_annotations_with_arbitrary_output(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll) -> str:
- return pcoll | Map(lambda num: str(num))
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((Any, ), {}))
- self.assertEqual(th.output_types, None)
-
- def test_annotations_with_arbitrary_input_and_output(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: int) -> str:
- return pcoll | Map(lambda num: str(num))
-
- input_error_str = (
- r'This input type hint will be ignored and not used for '
- r'type-checking purposes. Typically, input type hints for a '
- r'PTransform are single (or nested) types wrapped by a '
- r'PCollection, or PBegin. Got: {} instead.'.format(int))
-
- output_error_str = (
- r'This output type hint will be ignored and not used for '
- r'type-checking purposes. Typically, output type hints for a '
- r'PTransform are single (or nested) types wrapped by a '
- r'PCollection, PDone, or None. Got: {} instead.'.format(str))
-
- with self.assertLogs(level='WARN') as log:
- th = MyPTransform().get_type_hints()
- self.assertIn(input_error_str, log.output[0])
- self.assertIn(output_error_str, log.output[1])
- self.assertEqual(th.input_types, None)
- self.assertEqual(th.output_types, None)
-
- def test_typing_module_annotations_are_converted_to_beam_annotations(self):
- class MyPTransform(PTransform):
- def expand(
- self, pcoll: PCollection[typing.Dict[str, str]]
- ) -> PCollection[typing.Dict[str, str]]:
- return pcoll
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
- self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
-
- def test_nested_typing_annotations_are_converted_to_beam_annotations(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll:
- PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]) \
- -> PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]:
- return pcoll
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(
- th.input_types,
- ((typehints.Union[int, typehints.Any, typehints.Dict[str,
- float]], ), {}))
- self.assertEqual(
- th.input_types,
- ((typehints.Union[int, typehints.Any, typehints.Dict[str,
- float]], ), {}))
-
- def test_mixed_annotations_are_converted_to_beam_annotations(self):
- class MyPTransform(PTransform):
- def expand(self, pcoll: typing.Any) -> typehints.Any:
- return pcoll
-
- th = MyPTransform().get_type_hints()
- self.assertEqual(th.input_types, ((typehints.Any, ), {}))
- self.assertEqual(th.input_types, ((typehints.Any, ), {}))
-
-
-if __name__ == '__main__':
- unittest.main()