This is an automated email from the ASF dual-hosted git repository.

tvalentyn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 8926c43  [BEAM-7372] cleanup py2 and py35 codepath from 
apache_beam/typehints (#14752)
8926c43 is described below

commit 8926c43f20ab19b8e445d519cf424e881365fc6c
Author: yoshiki.obata <[email protected]>
AuthorDate: Thu May 20 09:03:30 2021 +0900

    [BEAM-7372] cleanup py2 and py35 codepath from apache_beam/typehints 
(#14752)
---
 sdks/python/apache_beam/typehints/__init__.py      |   2 -
 sdks/python/apache_beam/typehints/decorators.py    | 135 +-----
 .../apache_beam/typehints/decorators_test.py       | 189 +++++++-
 .../apache_beam/typehints/decorators_test_py3.py   | 215 ---------
 .../typehints/native_type_compatibility.py         |   9 -
 .../typehints/native_type_compatibility_test.py    |   3 -
 sdks/python/apache_beam/typehints/opcodes.py       |   6 +-
 sdks/python/apache_beam/typehints/row_type.py      |   2 -
 sdks/python/apache_beam/typehints/schemas.py       |  13 +-
 sdks/python/apache_beam/typehints/schemas_test.py  |  18 +-
 .../apache_beam/typehints/sharded_key_type.py      |   8 +-
 .../apache_beam/typehints/sharded_key_type_test.py |   2 -
 .../apache_beam/typehints/trivial_inference.py     |  13 +-
 .../typehints/trivial_inference_test.py            |  41 +-
 .../typehints/trivial_inference_test_py3.py        |  54 ---
 sdks/python/apache_beam/typehints/typecheck.py     |  23 +-
 .../{typecheck_test_py3.py => typecheck_test.py}   |   2 -
 .../apache_beam/typehints/typed_pipeline_test.py   | 496 ++++++++++++++++++-
 .../typehints/typed_pipeline_test_py3.py           | 535 ---------------------
 sdks/python/apache_beam/typehints/typehints.py     |   9 +-
 .../python/apache_beam/typehints/typehints_test.py | 248 +++++++++-
 .../apache_beam/typehints/typehints_test_py3.py    | 274 -----------
 22 files changed, 958 insertions(+), 1339 deletions(-)

diff --git a/sdks/python/apache_beam/typehints/__init__.py 
b/sdks/python/apache_beam/typehints/__init__.py
index 23d0b40..e89afa1 100644
--- a/sdks/python/apache_beam/typehints/__init__.py
+++ b/sdks/python/apache_beam/typehints/__init__.py
@@ -17,8 +17,6 @@
 
 """A package defining the syntax and decorator semantics for type-hints."""
 
-from __future__ import absolute_import
-
 # pylint: disable=wildcard-import
 from apache_beam.typehints.typehints import *
 from apache_beam.typehints.decorators import *
diff --git a/sdks/python/apache_beam/typehints/decorators.py 
b/sdks/python/apache_beam/typehints/decorators.py
index 3dc9878..478869d 100644
--- a/sdks/python/apache_beam/typehints/decorators.py
+++ b/sdks/python/apache_beam/typehints/decorators.py
@@ -61,12 +61,6 @@ Example usage for type-hinting both arguments and return 
values::
   def int_to_str(a):
     return str(a)
 
-Type-hinting a function with arguments that unpack tuples are also supported
-(in Python 2 only). As an example, such a function would be defined as::
-
-  def foo((a, b)):
-    ...
-
 The valid type-hint for such as function looks like the following::
 
   @with_input_types(a=int, b=int)
@@ -85,17 +79,12 @@ defined, or before importing a module containing 
type-hinted functions.
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import inspect
 import itertools
 import logging
 import sys
 import traceback
 import types
-from builtins import next
-from builtins import object
-from builtins import zip
 from typing import Any
 from typing import Callable
 from typing import Dict
@@ -115,11 +104,6 @@ from apache_beam.typehints.typehints import 
SimpleTypeHintError
 from apache_beam.typehints.typehints import check_constraint
 from apache_beam.typehints.typehints import validate_composite_type_param
 
-try:
-  import funcsigs  # Python 2 only.
-except ImportError:
-  funcsigs = None
-
 __all__ = [
     'disable_type_annotations',
     'no_annotations',
@@ -142,44 +126,6 @@ _ANY_VAR_POSITIONAL = typehints.Tuple[typehints.Any, ...]
 _ANY_VAR_KEYWORD = typehints.Dict[typehints.Any, typehints.Any]
 _disable_from_callable = False
 
-try:
-  _original_getfullargspec = inspect.getfullargspec
-  _use_full_argspec = True
-except AttributeError:  # Python 2
-  _original_getfullargspec = inspect.getargspec  # type: ignore
-  _use_full_argspec = False
-
-
-def getfullargspec(func):
-  # Python 3: Use get_signature instead.
-  assert sys.version_info < (3, ), 'This method should not be used in Python 3'
-  try:
-    return _original_getfullargspec(func)
-  except TypeError:
-    if isinstance(func, type):
-      argspec = getfullargspec(func.__init__)
-      del argspec.args[0]
-      return argspec
-    elif callable(func):
-      try:
-        return _original_getfullargspec(func.__call__)
-      except TypeError:
-        # Return an ArgSpec with at least one positional argument,
-        # and any number of other (positional or keyword) arguments
-        # whose name won't match any real argument.
-        # Arguments with the %unknown% prefix will be ignored in the type
-        # checking code.
-        if _use_full_argspec:
-          return inspect.FullArgSpec(['_'],
-                                     '__unknown__varargs',
-                                     '__unknown__keywords', (), [], {}, {})
-        else:  # Python 2
-          return inspect.ArgSpec(['_'],
-                                 '__unknown__varargs',
-                                 '__unknown__keywords', ())
-    else:
-      raise
-
 
 def get_signature(func):
   """Like inspect.signature(), but supports Py2 as well.
@@ -188,26 +134,18 @@ def get_signature(func):
   latter: 'the "self" parameter is always reported, even for bound methods'
   
https://github.com/python/cpython/blob/44f91c388a6f4da9ed3300df32ca290b8aa104ea/Lib/inspect.py#L1103
   """
-  # Fall back on funcsigs if inspect module doesn't have 'signature'; prefer
-  # inspect.signature over funcsigs.signature if both are available.
-  if hasattr(inspect, 'signature'):
-    inspect_ = inspect
-  else:
-    inspect_ = funcsigs
-
   try:
-    signature = inspect_.signature(func)
+    signature = inspect.signature(func)
   except ValueError:
     # Fall back on a catch-all signature.
     params = [
-        inspect_.Parameter('_', inspect_.Parameter.POSITIONAL_OR_KEYWORD),
-        inspect_.Parameter(
-            '__unknown__varargs', inspect_.Parameter.VAR_POSITIONAL),
-        inspect_.Parameter(
-            '__unknown__keywords', inspect_.Parameter.VAR_KEYWORD)
+        inspect.Parameter('_', inspect.Parameter.POSITIONAL_OR_KEYWORD),
+        inspect.Parameter(
+            '__unknown__varargs', inspect.Parameter.VAR_POSITIONAL),
+        inspect.Parameter('__unknown__keywords', inspect.Parameter.VAR_KEYWORD)
     ]
 
-    signature = inspect_.Signature(params)
+    signature = inspect.Signature(params)
 
   # This is a specialization to hint the first argument of certain builtins,
   # such as str.strip.
@@ -638,65 +576,6 @@ def _unpack_positional_arg_hints(arg, hint):
   return hint
 
 
-def getcallargs_forhints(func, *typeargs, **typekwargs):
-  """Like inspect.getcallargs, with support for declaring default args as Any.
-
-  In Python 2, understands that Tuple[] and an Any unpack.
-
-  Returns:
-    (Dict[str, Any]) A dictionary from arguments names to values.
-  """
-  if sys.version_info < (3, ):
-    return getcallargs_forhints_impl_py2(func, typeargs, typekwargs)
-  else:
-    return getcallargs_forhints_impl_py3(func, typeargs, typekwargs)
-
-
-def getcallargs_forhints_impl_py2(func, typeargs, typekwargs):
-  argspec = getfullargspec(func)
-  # Turn Tuple[x, y] into (x, y) so getcallargs can do the proper unpacking.
-  packed_typeargs = [
-      _unpack_positional_arg_hints(arg, hint)
-      for (arg, hint) in zip(argspec.args, typeargs)
-  ]
-  packed_typeargs += list(typeargs[len(packed_typeargs):])
-
-  # Monkeypatch inspect.getfullargspec to allow passing non-function objects.
-  # getfullargspec (getargspec on Python 2) are used by inspect.getcallargs.
-  # TODO(BEAM-5490): Reimplement getcallargs and stop relying on monkeypatch.
-  inspect.getargspec = getfullargspec
-  try:
-    callargs = inspect.getcallargs(func, *packed_typeargs, **typekwargs)  # 
pylint: disable=deprecated-method
-  except TypeError as e:
-    raise TypeCheckError(e)
-  finally:
-    # Revert monkey-patch.
-    inspect.getargspec = _original_getfullargspec
-
-  if argspec.defaults:
-    # Declare any default arguments to be Any.
-    for k, var in enumerate(reversed(argspec.args)):
-      if k >= len(argspec.defaults):
-        break
-      if callargs.get(var, None) is argspec.defaults[-k - 1]:
-        callargs[var] = typehints.Any
-  # Patch up varargs and keywords
-  if argspec.varargs:
-    # TODO(BEAM-8122): This will always assign _ANY_VAR_POSITIONAL. Should be
-    #   "callargs.get(...) or _ANY_VAR_POSITIONAL".
-    callargs[argspec.varargs] = typekwargs.get(
-        argspec.varargs, _ANY_VAR_POSITIONAL)
-
-  varkw = argspec.keywords
-  if varkw:
-    # TODO(robertwb): Consider taking the union of key and value types.
-    callargs[varkw] = typekwargs.get(varkw, _ANY_VAR_KEYWORD)
-
-  # TODO(BEAM-5878) Support kwonlyargs.
-
-  return callargs
-
-
 def _normalize_var_positional_hint(hint):
   """Converts a var_positional hint into Tuple[Union[<types>], ...] form.
 
@@ -743,7 +622,7 @@ def _normalize_var_keyword_hint(hint, arg_name):
     return typehints.Dict[str, typehints.Union[values]]
 
 
-def getcallargs_forhints_impl_py3(func, type_args, type_kwargs):
+def getcallargs_forhints(func, *type_args, **type_kwargs):
   """Bind type_args and type_kwargs to func.
 
   Works like inspect.getcallargs, with some modifications to support type hint
diff --git a/sdks/python/apache_beam/typehints/decorators_test.py 
b/sdks/python/apache_beam/typehints/decorators_test.py
index 85ca94f..faf0b55 100644
--- a/sdks/python/apache_beam/typehints/decorators_test.py
+++ b/sdks/python/apache_beam/typehints/decorators_test.py
@@ -19,24 +19,31 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
-import re
+import functools
 import sys
+import typing
 import unittest
 
-import future.tests.base  # pylint: disable=unused-import
-
+from apache_beam import Map
 from apache_beam.typehints import Any
+from apache_beam.typehints import Dict
 from apache_beam.typehints import List
+from apache_beam.typehints import Tuple
+from apache_beam.typehints import TypeCheckError
+from apache_beam.typehints import TypeVariable
 from apache_beam.typehints import WithTypeHints
 from apache_beam.typehints import decorators
 from apache_beam.typehints import typehints
 
+T = TypeVariable('T')
+# Name is 'T' so it converts to a beam type with the same name.
+# mypy requires that the name of the variable match, so we must ignore this.
+T_typing = typing.TypeVar('T')  # type: ignore
+
 
 class IOTypeHintsTest(unittest.TestCase):
   def test_get_signature(self):
-    # Basic coverage only to make sure function works in Py2 and Py3.
+    # Basic coverage only to make sure function works.
     def fn(a, b=1, *c, **d):
       return a, b, c, d
 
@@ -55,7 +62,6 @@ class IOTypeHintsTest(unittest.TestCase):
     self.assertEqual(s.return_annotation, List[Any])
 
   def test_from_callable_without_annotations(self):
-    # Python 2 doesn't support annotations. See decorators_test_py3.py for 
that.
     def fn(a, b=None, *args, **kwargs):
       return a, b, args, kwargs
 
@@ -120,8 +126,7 @@ class IOTypeHintsTest(unittest.TestCase):
     origin = ''.join(
         decorators.IOTypeHints.empty().with_input_types(str).origin)
     self.assertRegex(origin, __name__)
-    # TODO: use self.assertNotRegex once py2 support is removed.
-    self.assertIsNone(re.search(r'\b_make_traceback', origin), msg=origin)
+    self.assertNotRegex(origin, r'\b_make_traceback')
 
   def test_origin(self):
     th = decorators.IOTypeHints.empty()
@@ -153,6 +158,140 @@ class IOTypeHintsTest(unittest.TestCase):
     th = th.with_defaults(th2)
     self.assertNotEqual(expected_id, id(th))
 
+  def test_from_callable(self):
+    def fn(
+        a: int,
+        b: str = '',
+        *args: Tuple[T],
+        foo: List[int],
+        **kwargs: Dict[str, str]) -> Tuple[Any, ...]:
+      return a, b, args, foo, kwargs
+
+    th = decorators.IOTypeHints.from_callable(fn)
+    self.assertEqual(
+        th.input_types, ((int, str, Tuple[T]), {
+            'foo': List[int], 'kwargs': Dict[str, str]
+        }))
+    self.assertEqual(th.output_types, ((Tuple[Any, ...], ), {}))
+
+  def test_from_callable_partial_annotations(self):
+    def fn(a: int, b=None, *args, foo: List[int], **kwargs):
+      return a, b, args, foo, kwargs
+
+    th = decorators.IOTypeHints.from_callable(fn)
+    self.assertEqual(
+        th.input_types,
+        ((int, Any, Tuple[Any, ...]), {
+            'foo': List[int], 'kwargs': Dict[Any, Any]
+        }))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_from_callable_class(self):
+    class Class(object):
+      def __init__(self, unused_arg: int):
+        pass
+
+    th = decorators.IOTypeHints.from_callable(Class)
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((Class, ), {}))
+
+  def test_from_callable_method(self):
+    class Class(object):
+      def method(self, arg: T = None) -> None:
+        pass
+
+    th = decorators.IOTypeHints.from_callable(Class.method)
+    self.assertEqual(th.input_types, ((Any, T), {}))
+    self.assertEqual(th.output_types, ((None, ), {}))
+
+    th = decorators.IOTypeHints.from_callable(Class().method)
+    self.assertEqual(th.input_types, ((T, ), {}))
+    self.assertEqual(th.output_types, ((None, ), {}))
+
+  def test_from_callable_convert_to_beam_types(self):
+    def fn(
+        a: typing.List[int],
+        b: str = '',
+        *args: typing.Tuple[T_typing],
+        foo: typing.List[int],
+        **kwargs: typing.Dict[str, str]) -> typing.Tuple[typing.Any, ...]:
+      return a, b, args, foo, kwargs
+
+    th = decorators.IOTypeHints.from_callable(fn)
+    self.assertEqual(
+        th.input_types,
+        ((List[int], str, Tuple[T]), {
+            'foo': List[int], 'kwargs': Dict[str, str]
+        }))
+    self.assertEqual(th.output_types, ((Tuple[Any, ...], ), {}))
+
+  def test_from_callable_partial(self):
+    def fn(a: int) -> int:
+      return a
+
+    # functools.partial objects don't have __name__ attributes by default.
+    fn = functools.partial(fn, 1)
+    th = decorators.IOTypeHints.from_callable(fn)
+    self.assertRegex(th.debug_str(), r'unknown')
+
+  def test_getcallargs_forhints(self):
+    def fn(
+        a: int,
+        b: str = '',
+        *args: Tuple[T],
+        foo: List[int],
+        **kwargs: Dict[str, str]) -> Tuple[Any, ...]:
+      return a, b, args, foo, kwargs
+
+    callargs = decorators.getcallargs_forhints(fn, float, foo=List[str])
+    self.assertDictEqual(
+        callargs,
+        {
+            'a': float,
+            'b': str,
+            'args': Tuple[T],
+            'foo': List[str],
+            'kwargs': Dict[str, str]
+        })
+
+  def test_getcallargs_forhints_default_arg(self):
+    # Default args are not necessarily types, so they should be ignored.
+    def fn(a=List[int], b=None, *args, foo=(), **kwargs) -> Tuple[Any, ...]:
+      return a, b, args, foo, kwargs
+
+    callargs = decorators.getcallargs_forhints(fn)
+    self.assertDictEqual(
+        callargs,
+        {
+            'a': Any,
+            'b': Any,
+            'args': Tuple[Any, ...],
+            'foo': Any,
+            'kwargs': Dict[Any, Any]
+        })
+
+  def test_getcallargs_forhints_missing_arg(self):
+    def fn(a, b=None, *args, foo, **kwargs):
+      return a, b, args, foo, kwargs
+
+    with self.assertRaisesRegex(decorators.TypeCheckError, "missing.*'a'"):
+      decorators.getcallargs_forhints(fn, foo=List[int])
+    with self.assertRaisesRegex(decorators.TypeCheckError, "missing.*'foo'"):
+      decorators.getcallargs_forhints(fn, 5)
+
+  def test_origin(self):
+    def annotated(e: str) -> str:
+      return e
+
+    t = Map(annotated)
+    th = t.get_type_hints()
+    th = th.with_input_types(str)
+    self.assertRegex(th.debug_str(), r'with_input_types')
+    th = th.with_output_types(str)
+    self.assertRegex(
+        th.debug_str(),
+        r'(?s)with_output_types.*with_input_types.*Map.annotated')
+
 
 class WithTypeHintsTest(unittest.TestCase):
   def test_get_type_hints_no_settings(self):
@@ -231,5 +370,37 @@ class DecoratorsTest(unittest.TestCase):
     self.assertTrue(decorators._disable_from_callable)
 
 
+class DecoratorsTest(unittest.TestCase):
+  def test_no_annotations(self):
+    def fn(a: int) -> int:
+      return a
+
+    with self.assertRaisesRegex(TypeCheckError,
+                                r'requires .*int.* but got .*str'):
+      _ = ['a', 'b', 'c'] | Map(fn)
+
+    # Same pipeline doesn't raise without annotations on fn.
+    fn = decorators.no_annotations(fn)
+    _ = ['a', 'b', 'c'] | Map(fn)
+
+
+class DecoratorsTest(unittest.TestCase):
+  def test_no_annotations(self):
+    def fn(a: int) -> int:
+      return a
+
+    _ = [1, 2, 3] | Map(fn)  # Doesn't raise - correct types.
+
+    with self.assertRaisesRegex(TypeCheckError,
+                                r'requires .*int.* but got .*str'):
+      _ = ['a', 'b', 'c'] | Map(fn)
+
+    @decorators.no_annotations
+    def fn2(a: int) -> int:
+      return a
+
+    _ = ['a', 'b', 'c'] | Map(fn2)  # Doesn't raise - no input type hints.
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/typehints/decorators_test_py3.py 
b/sdks/python/apache_beam/typehints/decorators_test_py3.py
deleted file mode 100644
index 024a0d2..0000000
--- a/sdks/python/apache_beam/typehints/decorators_test_py3.py
+++ /dev/null
@@ -1,215 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests for decorators module with Python 3 syntax not supported by 2.7."""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-
-import functools
-import typing
-import unittest
-
-# patches unittest.TestCase to be python3 compatible
-import future.tests.base  # pylint: disable=unused-import
-
-from apache_beam import Map
-from apache_beam.typehints import Any
-from apache_beam.typehints import Dict
-from apache_beam.typehints import List
-from apache_beam.typehints import Tuple
-from apache_beam.typehints import TypeCheckError
-from apache_beam.typehints import TypeVariable
-from apache_beam.typehints import decorators
-
-T = TypeVariable('T')
-# Name is 'T' so it converts to a beam type with the same name.
-# mypy requires that the name of the variable match, so we must ignore this.
-T_typing = typing.TypeVar('T')  # type: ignore
-
-
-class IOTypeHintsTest(unittest.TestCase):
-  def test_from_callable(self):
-    def fn(
-        a: int,
-        b: str = '',
-        *args: Tuple[T],
-        foo: List[int],
-        **kwargs: Dict[str, str]) -> Tuple[Any, ...]:
-      return a, b, args, foo, kwargs
-
-    th = decorators.IOTypeHints.from_callable(fn)
-    self.assertEqual(
-        th.input_types, ((int, str, Tuple[T]), {
-            'foo': List[int], 'kwargs': Dict[str, str]
-        }))
-    self.assertEqual(th.output_types, ((Tuple[Any, ...], ), {}))
-
-  def test_from_callable_partial_annotations(self):
-    def fn(a: int, b=None, *args, foo: List[int], **kwargs):
-      return a, b, args, foo, kwargs
-
-    th = decorators.IOTypeHints.from_callable(fn)
-    self.assertEqual(
-        th.input_types,
-        ((int, Any, Tuple[Any, ...]), {
-            'foo': List[int], 'kwargs': Dict[Any, Any]
-        }))
-    self.assertEqual(th.output_types, ((Any, ), {}))
-
-  def test_from_callable_class(self):
-    class Class(object):
-      def __init__(self, unused_arg: int):
-        pass
-
-    th = decorators.IOTypeHints.from_callable(Class)
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((Class, ), {}))
-
-  def test_from_callable_method(self):
-    class Class(object):
-      def method(self, arg: T = None) -> None:
-        pass
-
-    th = decorators.IOTypeHints.from_callable(Class.method)
-    self.assertEqual(th.input_types, ((Any, T), {}))
-    self.assertEqual(th.output_types, ((None, ), {}))
-
-    th = decorators.IOTypeHints.from_callable(Class().method)
-    self.assertEqual(th.input_types, ((T, ), {}))
-    self.assertEqual(th.output_types, ((None, ), {}))
-
-  def test_from_callable_convert_to_beam_types(self):
-    def fn(
-        a: typing.List[int],
-        b: str = '',
-        *args: typing.Tuple[T_typing],
-        foo: typing.List[int],
-        **kwargs: typing.Dict[str, str]) -> typing.Tuple[typing.Any, ...]:
-      return a, b, args, foo, kwargs
-
-    th = decorators.IOTypeHints.from_callable(fn)
-    self.assertEqual(
-        th.input_types,
-        ((List[int], str, Tuple[T]), {
-            'foo': List[int], 'kwargs': Dict[str, str]
-        }))
-    self.assertEqual(th.output_types, ((Tuple[Any, ...], ), {}))
-
-  def test_from_callable_partial(self):
-    def fn(a: int) -> int:
-      return a
-
-    # functools.partial objects don't have __name__ attributes by default.
-    fn = functools.partial(fn, 1)
-    th = decorators.IOTypeHints.from_callable(fn)
-    self.assertRegex(th.debug_str(), r'unknown')
-
-  def test_getcallargs_forhints(self):
-    def fn(
-        a: int,
-        b: str = '',
-        *args: Tuple[T],
-        foo: List[int],
-        **kwargs: Dict[str, str]) -> Tuple[Any, ...]:
-      return a, b, args, foo, kwargs
-
-    callargs = decorators.getcallargs_forhints(fn, float, foo=List[str])
-    self.assertDictEqual(
-        callargs,
-        {
-            'a': float,
-            'b': str,
-            'args': Tuple[T],
-            'foo': List[str],
-            'kwargs': Dict[str, str]
-        })
-
-  def test_getcallargs_forhints_default_arg(self):
-    # Default args are not necessarily types, so they should be ignored.
-    def fn(a=List[int], b=None, *args, foo=(), **kwargs) -> Tuple[Any, ...]:
-      return a, b, args, foo, kwargs
-
-    callargs = decorators.getcallargs_forhints(fn)
-    self.assertDictEqual(
-        callargs,
-        {
-            'a': Any,
-            'b': Any,
-            'args': Tuple[Any, ...],
-            'foo': Any,
-            'kwargs': Dict[Any, Any]
-        })
-
-  def test_getcallargs_forhints_missing_arg(self):
-    def fn(a, b=None, *args, foo, **kwargs):
-      return a, b, args, foo, kwargs
-
-    with self.assertRaisesRegex(decorators.TypeCheckError, "missing.*'a'"):
-      decorators.getcallargs_forhints(fn, foo=List[int])
-    with self.assertRaisesRegex(decorators.TypeCheckError, "missing.*'foo'"):
-      decorators.getcallargs_forhints(fn, 5)
-
-  def test_origin(self):
-    def annotated(e: str) -> str:
-      return e
-
-    t = Map(annotated)
-    th = t.get_type_hints()
-    th = th.with_input_types(str)
-    self.assertRegex(th.debug_str(), r'with_input_types')
-    th = th.with_output_types(str)
-    self.assertRegex(
-        th.debug_str(),
-        r'(?s)with_output_types.*with_input_types.*Map.annotated')
-
-
-class DecoratorsTest(unittest.TestCase):
-  def test_no_annotations(self):
-    def fn(a: int) -> int:
-      return a
-
-    with self.assertRaisesRegex(TypeCheckError,
-                                r'requires .*int.* but got .*str'):
-      _ = ['a', 'b', 'c'] | Map(fn)
-
-    # Same pipeline doesn't raise without annotations on fn.
-    fn = decorators.no_annotations(fn)
-    _ = ['a', 'b', 'c'] | Map(fn)
-
-
-class DecoratorsTest(unittest.TestCase):
-  def test_no_annotations(self):
-    def fn(a: int) -> int:
-      return a
-
-    _ = [1, 2, 3] | Map(fn)  # Doesn't raise - correct types.
-
-    with self.assertRaisesRegex(TypeCheckError,
-                                r'requires .*int.* but got .*str'):
-      _ = ['a', 'b', 'c'] | Map(fn)
-
-    @decorators.no_annotations
-    def fn2(a: int) -> int:
-      return a
-
-    _ = ['a', 'b', 'c'] | Map(fn2)  # Doesn't raise - no input type hints.
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/typehints/native_type_compatibility.py 
b/sdks/python/apache_beam/typehints/native_type_compatibility.py
index d77727e..3d69cd6 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility.py
@@ -19,13 +19,10 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import collections
 import logging
 import sys
 import typing
-from builtins import next
 
 from apache_beam.typehints import typehints
 
@@ -134,12 +131,6 @@ def _match_is_union(user_type):
   if user_type is typing.Union:
     return True
 
-  try:  # Python 3.5.2
-    if isinstance(user_type, typing.UnionMeta):
-      return True
-  except AttributeError:
-    pass
-
   try:  # Python 3.5.4+, or Python 2.7.14+ with typing 3.64
     return user_type.__origin__ is typing.Union
   except AttributeError:
diff --git 
a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py 
b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
index bb38eef..577a6ee 100644
--- a/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
+++ b/sdks/python/apache_beam/typehints/native_type_compatibility_test.py
@@ -19,8 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import sys
 import typing
 import unittest
@@ -70,7 +68,6 @@ class NativeTypeCompatibilityTest(unittest.TestCase):
             bytes, typing.Union[int, bytes, float]]]],
          typehints.Tuple[bytes, typehints.List[typehints.Tuple[
              bytes, typehints.Union[int, bytes, float]]]]),
-        # TODO(BEAM-7713): This case seems to fail on Py3.5.2 but not 3.5.4.
         ('arbitrary-length tuple', typing.Tuple[int, ...],
          typehints.Tuple[int, ...]),
         ('flat alias', _TestFlatAlias, typehints.Tuple[bytes, float]),  # 
type: ignore[misc]
diff --git a/sdks/python/apache_beam/typehints/opcodes.py 
b/sdks/python/apache_beam/typehints/opcodes.py
index 56b2b53..b8aa75f 100644
--- a/sdks/python/apache_beam/typehints/opcodes.py
+++ b/sdks/python/apache_beam/typehints/opcodes.py
@@ -29,16 +29,12 @@ For internal use only; no backwards-compatibility 
guarantees.
 """
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import inspect
 import logging
 import sys
 import types
 from functools import reduce
 
-from past.builtins import unicode
-
 from apache_beam.typehints import row_type
 from apache_beam.typehints import typehints
 from apache_beam.typehints.trivial_inference import BoundMethod
@@ -160,7 +156,7 @@ binary_subtract = inplace_subtract = symmetric_binary_op
 def binary_subscr(state, unused_arg):
   index = state.stack.pop()
   base = Const.unwrap(state.stack.pop())
-  if base in (str, unicode):
+  if base is str:
     out = base
   elif (isinstance(index, Const) and isinstance(index.value, int) and
         isinstance(base, typehints.IndexableTypeConstraint)):
diff --git a/sdks/python/apache_beam/typehints/row_type.py 
b/sdks/python/apache_beam/typehints/row_type.py
index f301aed..f7098fb 100644
--- a/sdks/python/apache_beam/typehints/row_type.py
+++ b/sdks/python/apache_beam/typehints/row_type.py
@@ -17,8 +17,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 from apache_beam.typehints import typehints
 
 
diff --git a/sdks/python/apache_beam/typehints/schemas.py 
b/sdks/python/apache_beam/typehints/schemas.py
index 5daf68a..eae185d 100644
--- a/sdks/python/apache_beam/typehints/schemas.py
+++ b/sdks/python/apache_beam/typehints/schemas.py
@@ -30,7 +30,7 @@ Imposes a mapping between common Python types and Beam 
portable schemas
   np.float64  <-----> DOUBLE
   float       ------> DOUBLE
   bool        <-----> BOOLEAN
-  str/unicode <-----> STRING
+  str         <-----> STRING
   bytes       <-----> BYTES
   ByteString  ------> BYTES
   Timestamp   <-----> LogicalType(urn="beam:logical_type:micros_instant:v1")
@@ -51,8 +51,6 @@ wrapping the type in :code:`Optional`.
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 from typing import Any
 from typing import ByteString
 from typing import Generic
@@ -64,7 +62,6 @@ from typing import TypeVar
 from uuid import uuid4
 
 import numpy as np
-from past.builtins import unicode
 
 from apache_beam.portability.api import schema_pb2
 from apache_beam.typehints import row_type
@@ -108,7 +105,7 @@ _PRIMITIVES = (
     (np.int64, schema_pb2.INT64),
     (np.float32, schema_pb2.FLOAT),
     (np.float64, schema_pb2.DOUBLE),
-    (unicode, schema_pb2.STRING),
+    (str, schema_pb2.STRING),
     (bool, schema_pb2.BOOLEAN),
     (bytes, schema_pb2.BYTES),
 )
@@ -142,7 +139,7 @@ def named_fields_to_schema(names_and_types):
 
 
 def named_fields_from_schema(
-    schema):  # (schema_pb2.Schema) -> typing.List[typing.Tuple[unicode, type]]
+    schema):  # (schema_pb2.Schema) -> typing.List[typing.Tuple[str, type]]
   return [(field.name, typing_from_runner_api(field.type))
           for field in schema.fields]
 
@@ -296,7 +293,7 @@ def schema_from_element_type(element_type):  # (type) -> 
schema_pb2.Schema
 
 
 def named_fields_from_element_type(
-    element_type):  # (type) -> typing.List[typing.Tuple[unicode, type]]
+    element_type):  # (type) -> typing.List[typing.Tuple[str, type]]
   return named_fields_from_schema(schema_from_element_type(element_type))
 
 
@@ -332,7 +329,7 @@ class LogicalType(Generic[LanguageT, RepresentationT, 
ArgT]):
 
   @classmethod
   def urn(cls):
-    # type: () -> unicode
+    # type: () -> str
 
     """Return the URN used to identify this logical type"""
     raise NotImplementedError()
diff --git a/sdks/python/apache_beam/typehints/schemas_test.py 
b/sdks/python/apache_beam/typehints/schemas_test.py
index c28f2c5..c662bac 100644
--- a/sdks/python/apache_beam/typehints/schemas_test.py
+++ b/sdks/python/apache_beam/typehints/schemas_test.py
@@ -19,9 +19,8 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import itertools
+import pickle
 import unittest
 from typing import ByteString
 from typing import List
@@ -31,8 +30,6 @@ from typing import Optional
 from typing import Sequence
 
 import numpy as np
-from future.moves import pickle
-from past.builtins import unicode
 
 from apache_beam.portability.api import schema_pb2
 from apache_beam.typehints.schemas import named_tuple_from_schema
@@ -58,7 +55,6 @@ class SchemaTest(unittest.TestCase):
         np.int64,
         np.float32,
         np.float64,
-        unicode,
         bool,
         bytes,
         str,
@@ -85,10 +81,8 @@ class SchemaTest(unittest.TestCase):
             'ComplexSchema',
             [
                 ('id', np.int64),
-                ('name', unicode),
-                (
-                    'optional_map',
-                    Optional[Mapping[unicode, Optional[np.float64]]]),
+                ('name', str),
+                ('optional_map', Optional[Mapping[str, Optional[np.float64]]]),
                 ('optional_array', Optional[Sequence[np.float32]]),
                 ('array_optional', Sequence[Optional[bool]]),
                 ('timestamp', Timestamp),
@@ -215,9 +209,9 @@ class SchemaTest(unittest.TestCase):
     MyCuteClass = NamedTuple(
         'MyCuteClass',
         [
-            ('name', unicode),
+            ('name', str),
             ('age', Optional[int]),
-            ('interests', List[unicode]),
+            ('interests', List[str]),
             ('height', float),
             ('blob', ByteString),
         ])
@@ -273,7 +267,7 @@ class SchemaTest(unittest.TestCase):
 
   def test_user_type_annotated_with_id_after_conversion(self):
     MyCuteClass = NamedTuple('MyCuteClass', [
-        ('name', unicode),
+        ('name', str),
     ])
     self.assertFalse(hasattr(MyCuteClass, '_beam_schema_id'))
 
diff --git a/sdks/python/apache_beam/typehints/sharded_key_type.py 
b/sdks/python/apache_beam/typehints/sharded_key_type.py
index 72b198d..2461141 100644
--- a/sdks/python/apache_beam/typehints/sharded_key_type.py
+++ b/sdks/python/apache_beam/typehints/sharded_key_type.py
@@ -28,17 +28,13 @@ Mostly for internal use.
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
-from six import with_metaclass
-
 from apache_beam import coders
 from apache_beam.typehints import typehints
 from apache_beam.utils.sharded_key import ShardedKey
 
 
-class ShardedKeyTypeConstraint(with_metaclass(typehints.GetitemConstructor,
-                                              typehints.TypeConstraint)):
+class ShardedKeyTypeConstraint(typehints.TypeConstraint,
+                               metaclass=typehints.GetitemConstructor):
   def __init__(self, key_type):
     typehints.validate_composite_type_param(
         key_type, error_msg_prefix='Parameter to ShardedKeyType hint')
diff --git a/sdks/python/apache_beam/typehints/sharded_key_type_test.py 
b/sdks/python/apache_beam/typehints/sharded_key_type_test.py
index dce065b..922fcf9 100644
--- a/sdks/python/apache_beam/typehints/sharded_key_type_test.py
+++ b/sdks/python/apache_beam/typehints/sharded_key_type_test.py
@@ -19,8 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 from apache_beam.typehints import Tuple
 from apache_beam.typehints import typehints
 from apache_beam.typehints.sharded_key_type import ShardedKeyType
diff --git a/sdks/python/apache_beam/typehints/trivial_inference.py 
b/sdks/python/apache_beam/typehints/trivial_inference.py
index 2e7f861..cc6534d 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference.py
@@ -21,9 +21,7 @@ For internal use only; no backwards-compatibility guarantees.
 """
 # pytype: skip-file
 
-from __future__ import absolute_import
-from __future__ import print_function
-
+import builtins
 import collections
 import dis
 import inspect
@@ -31,8 +29,6 @@ import pprint
 import sys
 import traceback
 import types
-from builtins import object
-from builtins import zip
 from functools import reduce
 
 from apache_beam import pvalue
@@ -40,13 +36,6 @@ from apache_beam.typehints import Any
 from apache_beam.typehints import row_type
 from apache_beam.typehints import typehints
 
-# pylint: disable=wrong-import-order, wrong-import-position, ungrouped-imports
-try:  # Python 2
-  import __builtin__ as builtins
-except ImportError:  # Python 3
-  import builtins  # type: ignore
-# pylint: enable=wrong-import-order, wrong-import-position, ungrouped-imports
-
 
 class TypeInferenceError(ValueError):
   """Error to raise when type inference failed."""
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test.py 
b/sdks/python/apache_beam/typehints/trivial_inference_test.py
index 7f54a40..e03e919 100644
--- a/sdks/python/apache_beam/typehints/trivial_inference_test.py
+++ b/sdks/python/apache_beam/typehints/trivial_inference_test.py
@@ -19,9 +19,6 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
-import sys
 import types
 import unittest
 
@@ -39,6 +36,21 @@ class TrivialInferenceTest(unittest.TestCase):
         expected,
         trivial_inference.infer_return_type(f, inputs, debug=True, 
depth=depth))
 
+  def testBuildListUnpack(self):
+    # Lambda uses BUILD_LIST_UNPACK opcode in Python 3.
+    self.assertReturnType(
+        typehints.List[int],
+        lambda _list: [*_list, *_list, *_list], [typehints.List[int]])
+
+  def testBuildTupleUnpack(self):
+    # Lambda uses BUILD_TUPLE_UNPACK opcode in Python 3.
+    # yapf: disable
+    self.assertReturnType(
+        typehints.Tuple[int, str, str],
+        lambda _list1, _list2: (*_list1, *_list2, *_list2),
+        [typehints.List[int], typehints.List[str]])
+    # yapf: enable
+
   def testIdentity(self):
     self.assertReturnType(int, lambda x: x, [int])
 
@@ -74,11 +86,9 @@ class TrivialInferenceTest(unittest.TestCase):
     self.assertReturnType(str, lambda v: v[::-1], [str])
     self.assertReturnType(typehints.Any, lambda v: v[::-1], [typehints.Any])
     self.assertReturnType(typehints.Any, lambda v: v[::-1], [object])
-    if sys.version_info >= (3, ):
-      # Test binary_subscr on a slice of a Const. On Py2.7 this will use the
-      # unsupported opcode SLICE+0.
-      test_list = ['a', 'b']
-      self.assertReturnType(typehints.List[str], lambda: test_list[:], [])
+    # Test binary_subscr on a slice of a Const.
+    test_list = ['a', 'b']
+    self.assertReturnType(typehints.List[str], lambda: test_list[:], [])
 
   def testUnpack(self):
     def reverse(a_b):
@@ -104,7 +114,6 @@ class TrivialInferenceTest(unittest.TestCase):
     self.assertReturnType(
         any_tuple, reverse, [trivial_inference.Const((1, 2, 3))])
 
-  @unittest.skipIf(sys.version_info < (3, ), 'BUILD_MAP better in Python 3')
   def testBuildMap(self):
     self.assertReturnType(
         typehints.Dict[typehints.Any, typehints.Any],
@@ -170,11 +179,7 @@ class TrivialInferenceTest(unittest.TestCase):
     self.assertReturnType(
         typehints.List[typehints.Union[int, float]],
         lambda xs: [x for x in xs], [typehints.Tuple[int, float]])
-    if sys.version_info[:2] == (3, 5):
-      # A better result requires implementing the MAKE_CLOSURE opcode.
-      expected = typehints.Any
-    else:
-      expected = typehints.List[typehints.Tuple[str, int]]
+    expected = typehints.List[typehints.Tuple[str, int]]
     self.assertReturnType(
         expected,
         lambda kvs: [(kvs[0], v) for v in kvs[1]],
@@ -269,11 +274,7 @@ class TrivialInferenceTest(unittest.TestCase):
 
   def testDictComprehension(self):
     fields = []
-    if sys.version_info >= (3, 6):
-      expected_type = typehints.Dict[typehints.Any, typehints.Any]
-    else:
-      # For Python 2, just ensure it doesn't crash.
-      expected_type = typehints.Any
+    expected_type = typehints.Dict[typehints.Any, typehints.Any]
     self.assertReturnType(
         expected_type, lambda row: {f: row[f]
                                     for f in fields}, [typehints.Any])
@@ -318,7 +319,6 @@ class TrivialInferenceTest(unittest.TestCase):
         x2,
         _list: fn(x1, x2, *_list), [str, typehints.List[int]])
 
-  @unittest.skipIf(sys.version_info < (3, 6), 'CALL_FUNCTION_EX is new in 3.6')
   def testCallFunctionEx(self):
     # Test when fn arguments are built using BUiLD_LIST.
     def fn(*args):
@@ -329,7 +329,6 @@ class TrivialInferenceTest(unittest.TestCase):
         lambda x1,
         x2: fn(*[x1, x2]), [str, float])
 
-  @unittest.skipIf(sys.version_info < (3, 6), 'CALL_FUNCTION_EX is new in 3.6')
   def testCallFunctionExKwargs(self):
     def fn(x1, x2, **unused_kwargs):
       return x1, x2
diff --git a/sdks/python/apache_beam/typehints/trivial_inference_test_py3.py 
b/sdks/python/apache_beam/typehints/trivial_inference_test_py3.py
deleted file mode 100644
index e52a7ec..0000000
--- a/sdks/python/apache_beam/typehints/trivial_inference_test_py3.py
+++ /dev/null
@@ -1,54 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Tests for apache_beam.typehints.trivial_inference that use Python 3 syntax.
-"""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-
-import unittest
-
-from apache_beam.typehints import trivial_inference
-from apache_beam.typehints import typehints
-
-
-class TrivialInferenceTest(unittest.TestCase):
-  def assertReturnType(self, expected, f, inputs=(), depth=5):
-    self.assertEqual(
-        expected,
-        trivial_inference.infer_return_type(f, inputs, debug=True, 
depth=depth))
-
-  def testBuildListUnpack(self):
-    # Lambda uses BUILD_LIST_UNPACK opcode in Python 3.
-    self.assertReturnType(
-        typehints.List[int],
-        lambda _list: [*_list, *_list, *_list], [typehints.List[int]])
-
-  def testBuildTupleUnpack(self):
-    # Lambda uses BUILD_TUPLE_UNPACK opcode in Python 3.
-    # yapf: disable
-    self.assertReturnType(
-        typehints.Tuple[int, str, str],
-        lambda _list1, _list2: (*_list1, *_list2, *_list2),
-        [typehints.List[int], typehints.List[str]])
-    # yapf: enable
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/typehints/typecheck.py 
b/sdks/python/apache_beam/typehints/typecheck.py
index d44d0de..6c4ba25 100644
--- a/sdks/python/apache_beam/typehints/typecheck.py
+++ b/sdks/python/apache_beam/typehints/typecheck.py
@@ -22,15 +22,11 @@ For internal use only; no backwards-compatibility 
guarantees.
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import collections
 import inspect
+import sys
 import types
 
-from future.utils import raise_with_traceback
-from past.builtins import unicode
-
 from apache_beam import pipeline
 from apache_beam.pvalue import TaggedOutput
 from apache_beam.transforms import core
@@ -93,7 +89,8 @@ class OutputCheckWrapperDoFn(AbstractDoFnWrapper):
       error_msg = (
           'Runtime type violation detected within ParDo(%s): '
           '%s' % (self.full_label, e))
-      raise_with_traceback(TypeCheckError(error_msg))
+      _, _, tb = sys.exc_info()
+      raise TypeCheckError(error_msg).with_traceback(tb)
     else:
       return self._check_type(result)
 
@@ -102,7 +99,7 @@ class OutputCheckWrapperDoFn(AbstractDoFnWrapper):
     if output is None:
       return output
 
-    elif isinstance(output, (dict, bytes, str, unicode)):
+    elif isinstance(output, (dict, bytes, str)):
       object_type = type(output).__name__
       raise TypeCheckError(
           'Returning a %s from a ParDo or FlatMap is '
@@ -184,13 +181,15 @@ class TypeCheckWrapperDoFn(AbstractDoFnWrapper):
     try:
       check_constraint(type_constraint, datum)
     except CompositeTypeHintError as e:
-      raise_with_traceback(TypeCheckError(e.args[0]))
+      _, _, tb = sys.exc_info()
+      raise TypeCheckError(e.args[0]).with_traceback(tb)
     except SimpleTypeHintError:
       error_msg = (
           "According to type-hint expected %s should be of type %s. "
           "Instead, received '%s', an instance of type %s." %
           (datum_type, type_constraint, datum, type(datum)))
-      raise_with_traceback(TypeCheckError(error_msg))
+      _, _, tb = sys.exc_info()
+      raise TypeCheckError(error_msg).with_traceback(tb)
 
 
 class TypeCheckCombineFn(core.CombineFn):
@@ -220,7 +219,8 @@ class TypeCheckCombineFn(core.CombineFn):
         error_msg = (
             'Runtime type violation detected within %s: '
             '%s' % (self._label, e))
-        raise_with_traceback(TypeCheckError(error_msg))
+        _, _, tb = sys.exc_info()
+        raise TypeCheckError(error_msg).with_traceback(tb)
     return self._combinefn.add_input(accumulator, element, *args, **kwargs)
 
   def merge_accumulators(self, accumulators, *args, **kwargs):
@@ -239,7 +239,8 @@ class TypeCheckCombineFn(core.CombineFn):
         error_msg = (
             'Runtime type violation detected within %s: '
             '%s' % (self._label, e))
-        raise_with_traceback(TypeCheckError(error_msg))
+        _, _, tb = sys.exc_info()
+        raise TypeCheckError(error_msg).with_traceback(tb)
     return result
 
   def teardown(self, *args, **kwargs):
diff --git a/sdks/python/apache_beam/typehints/typecheck_test_py3.py 
b/sdks/python/apache_beam/typehints/typecheck_test.py
similarity index 99%
rename from sdks/python/apache_beam/typehints/typecheck_test_py3.py
rename to sdks/python/apache_beam/typehints/typecheck_test.py
index 9b5e487..b6897b5 100644
--- a/sdks/python/apache_beam/typehints/typecheck_test_py3.py
+++ b/sdks/python/apache_beam/typehints/typecheck_test.py
@@ -23,8 +23,6 @@ See additional runtime_type_check=True tests in 
ptransform_test.py.
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import os
 import tempfile
 import unittest
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test.py 
b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
index d9cea6b..b880e49 100644
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test.py
+++ b/sdks/python/apache_beam/typehints/typed_pipeline_test.py
@@ -19,15 +19,10 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import sys
 import typing
 import unittest
 
-# patches unittest.TestCase to be python3 compatible
-import future.tests.base  # pylint: disable=unused-import
-
 import apache_beam as beam
 from apache_beam import pvalue
 from apache_beam import typehints
@@ -44,6 +39,10 @@ from apache_beam.typehints.decorators import get_signature
 
 
 class MainInputTest(unittest.TestCase):
+  def assertStartswith(self, msg, prefix):
+    self.assertTrue(
+        msg.startswith(prefix), '"%s" does not start with "%s"' % (msg, 
prefix))
+
   def test_bad_main_input(self):
     @typehints.with_input_types(str, int)
     def repeat(s, times):
@@ -66,7 +65,7 @@ class MainInputTest(unittest.TestCase):
     self.assertEqual([1, 16, 256], sorted(result))
 
   @unittest.skipIf(
-      sys.version_info.major >= 3 and sys.version_info < (3, 7, 0),
+      sys.version_info < (3, 7, 0),
       'Function signatures for builtins are not available in Python 3 before '
       'version 3.7.')
   def test_non_function_fails(self):
@@ -100,18 +99,57 @@ class MainInputTest(unittest.TestCase):
                                 r'requires.*int.*got.*str'):
       [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
 
+  def test_typed_dofn_method(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element: int) -> typehints.Tuple[str]:
+        return tuple(str(element))
+
+    result = [1, 2, 3] | beam.ParDo(MyDoFn())
+    self.assertEqual(['1', '2', '3'], sorted(result))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*int.*got.*str'):
+      _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*int.*got.*str'):
+      _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
+
+  def test_typed_dofn_method_with_class_decorators(self):
+    # Class decorators take precedence over PEP 484 hints.
+    @typehints.with_input_types(typehints.Tuple[int, int])
+    @typehints.with_output_types(int)
+    class MyDoFn(beam.DoFn):
+      def process(self, element: int) -> typehints.Tuple[str]:
+        yield element[0]
+
+    result = [(1, 2)] | beam.ParDo(MyDoFn())
+    self.assertEqual([1], sorted(result))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*Tuple\[int, int\].*got.*str'):
+      _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*Tuple\[int, int\].*got.*int'):
+      _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
+
   def test_typed_callable_iterable_output(self):
-    @typehints.with_input_types(int)
-    @typehints.with_output_types(typehints.Iterable[typehints.Iterable[str]])
-    def do_fn(element):
+    # Only the outer Iterable should be stripped.
+    def do_fn(element: int) -> typehints.Iterable[typehints.Iterable[str]]:
       return [[str(element)] * 2]
 
     result = [1, 2] | beam.ParDo(do_fn)
     self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
 
   def test_typed_dofn_instance(self):
+    # Type hints applied to DoFn instance take precedence over decorators and
+    # process annotations.
+    @typehints.with_input_types(typehints.Tuple[int, int])
+    @typehints.with_output_types(int)
     class MyDoFn(beam.DoFn):
-      def process(self, element):
+      def process(self, element: typehints.Tuple[int, int]) -> \
+              typehints.List[int]:
         return [str(element)]
 
     my_do_fn = MyDoFn().with_input_types(int).with_output_types(str)
@@ -119,11 +157,34 @@ class MainInputTest(unittest.TestCase):
     result = [1, 2, 3] | beam.ParDo(my_do_fn)
     self.assertEqual(['1', '2', '3'], sorted(result))
 
-    with self.assertRaises(typehints.TypeCheckError):
-      ['a', 'b', 'c'] | beam.ParDo(my_do_fn)
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*int.*got.*str'):
+      _ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn)
 
-    with self.assertRaises(typehints.TypeCheckError):
-      [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*int.*got.*str'):
+      _ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
+
+  def test_typed_callable_instance(self):
+    # Type hints applied to ParDo instance take precedence over callable
+    # decorators and annotations.
+    @typehints.with_input_types(typehints.Tuple[int, int])
+    @typehints.with_output_types(typehints.Generator[int])
+    def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]:
+      yield str(element)
+
+    pardo = beam.ParDo(do_fn).with_input_types(int).with_output_types(str)
+
+    result = [1, 2, 3] | pardo
+    self.assertEqual(['1', '2', '3'], sorted(result))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*int.*got.*str'):
+      _ = ['a', 'b', 'c'] | pardo
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*int.*got.*str'):
+      _ = [1, 2, 3] | (pardo | 'again' >> pardo)
 
   def test_filter_type_hint(self):
     @typehints.with_input_types(int)
@@ -264,6 +325,308 @@ class MainInputTest(unittest.TestCase):
             'strings': pcoll2, 'integers': pcoll1
         } | 'fails' >> multi_input('additional_arg')
 
+  def test_typed_dofn_method_not_iterable(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element: int) -> str:
+        return str(element)
+
+    with self.assertRaisesRegex(ValueError, r'str.*is not iterable'):
+      _ = [1, 2, 3] | beam.ParDo(MyDoFn())
+
+  def test_typed_dofn_method_return_none(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, unused_element: int) -> None:
+        pass
+
+    result = [1, 2, 3] | beam.ParDo(MyDoFn())
+    self.assertListEqual([], result)
+
+  def test_typed_dofn_method_return_optional(self):
+    class MyDoFn(beam.DoFn):
+      def process(
+          self,
+          unused_element: int) -> typehints.Optional[typehints.Iterable[int]]:
+        pass
+
+    result = [1, 2, 3] | beam.ParDo(MyDoFn())
+    self.assertListEqual([], result)
+
+  def test_typed_dofn_method_return_optional_not_iterable(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, unused_element: int) -> typehints.Optional[int]:
+        pass
+
+    with self.assertRaisesRegex(ValueError, r'int.*is not iterable'):
+      _ = [1, 2, 3] | beam.ParDo(MyDoFn())
+
+  def test_typed_callable_not_iterable(self):
+    def do_fn(element: int) -> int:
+      return element
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'int.*is not iterable'):
+      _ = [1, 2, 3] | beam.ParDo(do_fn)
+
+  def test_typed_dofn_kwonly(self):
+    class MyDoFn(beam.DoFn):
+      # TODO(BEAM-5878): A kwonly argument like
+      #   timestamp=beam.DoFn.TimestampParam would not work here.
+      def process(self, element: int, *, side_input: str) -> \
+          typehints.Generator[typehints.Optional[str]]:
+        yield str(element) if side_input else None
+
+    my_do_fn = MyDoFn()
+
+    result = [1, 2, 3] | beam.ParDo(my_do_fn, side_input='abc')
+    self.assertEqual(['1', '2', '3'], sorted(result))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*str.*got.*int.*side_input'):
+      _ = [1, 2, 3] | beam.ParDo(my_do_fn, side_input=1)
+
+  def test_typed_dofn_var_kwargs(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element: int, **side_inputs: typehints.Dict[str, str]) 
\
+          -> typehints.Generator[typehints.Optional[str]]:
+        yield str(element) if side_inputs else None
+
+    my_do_fn = MyDoFn()
+
+    result = [1, 2, 3] | beam.ParDo(my_do_fn, foo='abc', bar='def')
+    self.assertEqual(['1', '2', '3'], sorted(result))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'requires.*str.*got.*int.*side_inputs'):
+      _ = [1, 2, 3] | beam.ParDo(my_do_fn, a=1)
+
+  def test_typed_callable_string_literals(self):
+    def do_fn(element: 'int') -> 'typehints.List[str]':
+      return [[str(element)] * 2]
+
+    result = [1, 2] | beam.ParDo(do_fn)
+    self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
+
+  def test_typed_ptransform_fn(self):
+    # Test that type hints are propagated to the created PTransform.
+    @beam.ptransform_fn
+    @typehints.with_input_types(int)
+    def MyMap(pcoll):
+      def fn(element: int):
+        yield element
+
+      return pcoll | beam.ParDo(fn)
+
+    self.assertListEqual([1, 2, 3], [1, 2, 3] | MyMap())
+    with self.assertRaisesRegex(typehints.TypeCheckError, r'int.*got.*str'):
+      _ = ['a'] | MyMap()
+
+  def test_typed_ptransform_fn_conflicting_hints(self):
+    # In this case, both MyMap and its contained ParDo have separate type
+    # checks (that disagree with each other).
+    @beam.ptransform_fn
+    @typehints.with_input_types(int)
+    def MyMap(pcoll):
+      def fn(element: float):
+        yield element
+
+      return pcoll | beam.ParDo(fn)
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'ParDo.*requires.*float.*got.*int'):
+      _ = [1, 2, 3] | MyMap()
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'MyMap.*expected.*int.*got.*str'):
+      _ = ['a'] | MyMap()
+
+  def test_typed_dofn_string_literals(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element: 'int') -> 'typehints.List[str]':
+        return [[str(element)] * 2]
+
+    result = [1, 2] | beam.ParDo(MyDoFn())
+    self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
+
+  def test_typed_map(self):
+    def fn(element: int) -> int:
+      return element * 2
+
+    result = [1, 2, 3] | beam.Map(fn)
+    self.assertEqual([2, 4, 6], sorted(result))
+
+  def test_typed_map_return_optional(self):
+    # None is a valid element value for Map.
+    def fn(element: int) -> typehints.Optional[int]:
+      if element > 1:
+        return element
+
+    result = [1, 2, 3] | beam.Map(fn)
+    self.assertCountEqual([None, 2, 3], result)
+
+  def test_typed_flatmap(self):
+    def fn(element: int) -> typehints.Iterable[int]:
+      yield element * 2
+
+    result = [1, 2, 3] | beam.FlatMap(fn)
+    self.assertCountEqual([2, 4, 6], result)
+
+  def test_typed_flatmap_output_hint_not_iterable(self):
+    def fn(element: int) -> int:
+      return element * 2
+
+    # This is raised (originally) in strip_iterable.
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'int.*is not iterable'):
+      _ = [1, 2, 3] | beam.FlatMap(fn)
+
+  def test_typed_flatmap_output_value_not_iterable(self):
+    def fn(element: int) -> typehints.Iterable[int]:
+      return element * 2
+
+    # This is raised in runners/common.py (process_outputs).
+    with self.assertRaisesRegex(TypeError, r'int.*is not iterable'):
+      _ = [1, 2, 3] | beam.FlatMap(fn)
+
+  def test_typed_flatmap_optional(self):
+    def fn(element: int) -> typehints.Optional[typehints.Iterable[int]]:
+      if element > 1:
+        yield element * 2
+
+    # Verify that the output type of fn is int and not Optional[int].
+    def fn2(element: int) -> int:
+      return element
+
+    result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2)
+    self.assertCountEqual([4, 6], result)
+
+  def test_typed_ptransform_with_no_error(self):
+    class StrToInt(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_bad_typehints(self):
+    class StrToInt(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                "Input type hint violation at IntToStr: "
+                                "expected <class 'str'>, got <class 'int'>"):
+      _ = ['1', '2', '3'] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_bad_input(self):
+    class StrToInt(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                "Input type hint violation at StrToInt: "
+                                "expected <class 'str'>, got <class 'int'>"):
+      # Feed integers to a PTransform that expects strings
+      _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_partial_typehints(self):
+    class StrToInt(beam.PTransform):
+      def expand(self, pcoll) -> beam.pvalue.PCollection[int]:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    # Feed integers to a PTransform that should expect strings
+    # but has no typehints so it expects any
+    _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_bare_wrappers(self):
+    class StrToInt(beam.PTransform):
+      def expand(
+          self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_no_typehints(self):
+    class StrToInt(beam.PTransform):
+      def expand(self, pcoll):
+        return pcoll | beam.Map(lambda x: int(x))
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    # Feed integers to a PTransform that should expect strings
+    # but has no typehints so it expects any
+    _ = [1, 2, 3] | StrToInt() | IntToStr()
+
+  def test_typed_ptransform_with_generic_annotations(self):
+    T = typing.TypeVar('T')
+
+    class IntToInt(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[T]:
+        return pcoll | beam.Map(lambda x: x)
+
+    class IntToStr(beam.PTransform):
+      def expand(
+          self,
+          pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[str]:
+        return pcoll | beam.Map(lambda x: str(x))
+
+    _ = [1, 2, 3] | IntToInt() | IntToStr()
+
+  def test_typed_ptransform_with_do_outputs_tuple_compiles(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element: int, *args, **kwargs):
+        if element % 2:
+          yield beam.pvalue.TaggedOutput('odd', 1)
+        else:
+          yield beam.pvalue.TaggedOutput('even', 1)
+
+    class MyPTransform(beam.PTransform):
+      def expand(self, pcoll: beam.pvalue.PCollection[int]):
+        return pcoll | beam.ParDo(MyDoFn()).with_outputs('odd', 'even')
+
+    # This test fails if you remove the following line from ptransform.py
+    # if isinstance(pvalue_, DoOutputsTuple): continue
+    _ = [1, 2, 3] | MyPTransform()
+
 
 class NativeTypesTest(unittest.TestCase):
   def test_good_main_input(self):
@@ -519,6 +882,111 @@ class AnnotationsTest(unittest.TestCase):
     self.assertIsNone(th.input_types)
     self.assertIsNone(th.output_types)
 
+  def test_pardo_dofn(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element: int) -> typehints.Generator[str]:
+        yield str(element)
+
+    th = beam.ParDo(MyDoFn()).get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_pardo_dofn_not_iterable(self):
+    class MyDoFn(beam.DoFn):
+      def process(self, element: int) -> str:
+        return str(element)
+
+    with self.assertRaisesRegex(ValueError, r'str.*is not iterable'):
+      _ = beam.ParDo(MyDoFn()).get_type_hints()
+
+  def test_pardo_wrapper(self):
+    def do_fn(element: int) -> typehints.Iterable[str]:
+      return [str(element)]
+
+    th = beam.ParDo(do_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_pardo_wrapper_tuple(self):
+    # Test case for callables that return key-value pairs for GBK. The outer
+    # Iterable should be stripped but the inner Tuple left intact.
+    def do_fn(element: int) -> typehints.Iterable[typehints.Tuple[str, int]]:
+      return [(str(element), element)]
+
+    th = beam.ParDo(do_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((typehints.Tuple[str, int], ), {}))
+
+  def test_pardo_wrapper_not_iterable(self):
+    def do_fn(element: int) -> str:
+      return str(element)
+
+    with self.assertRaisesRegex(typehints.TypeCheckError,
+                                r'str.*is not iterable'):
+      _ = beam.ParDo(do_fn).get_type_hints()
+
+  def test_flat_map_wrapper(self):
+    def map_fn(element: int) -> typehints.Iterable[int]:
+      return [element, element + 1]
+
+    th = beam.FlatMap(map_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((int, ), {}))
+
+  def test_flat_map_wrapper_optional_output(self):
+    # Optional should not affect output type (Nones are ignored).
+    def map_fn(element: int) -> typehints.Optional[typehints.Iterable[int]]:
+      return [element, element + 1]
+
+    th = beam.FlatMap(map_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((int, ), {}))
+
+  @unittest.skip('BEAM-8662: Py3 annotations not yet supported for MapTuple')
+  def test_flat_map_tuple_wrapper(self):
+    # TODO(BEAM-8662): Also test with a fn that accepts default arguments.
+    def tuple_map_fn(a: str, b: str, c: str) -> typehints.Iterable[str]:
+      return [a, b, c]
+
+    th = beam.FlatMapTuple(tuple_map_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((str, str, str), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_map_wrapper(self):
+    def map_fn(unused_element: int) -> int:
+      return 1
+
+    th = beam.Map(map_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((int, ), {}))
+
+  def test_map_wrapper_optional_output(self):
+    # Optional does affect output type (Nones are NOT ignored).
+    def map_fn(unused_element: int) -> typehints.Optional[int]:
+      return 1
+
+    th = beam.Map(map_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((typehints.Optional[int], ), {}))
+
+  @unittest.skip('BEAM-8662: Py3 annotations not yet supported for MapTuple')
+  def test_map_tuple(self):
+    # TODO(BEAM-8662): Also test with a fn that accepts default arguments.
+    def tuple_map_fn(a: str, b: str, c: str) -> str:
+      return a + b + c
+
+    th = beam.MapTuple(tuple_map_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((str, str, str), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_filter_wrapper(self):
+    def filter_fn(element: int) -> bool:
+      return bool(element % 2)
+
+    th = beam.Filter(filter_fn).get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((int, ), {}))
+
 
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py 
b/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
deleted file mode 100644
index 6c45c5c..0000000
--- a/sdks/python/apache_beam/typehints/typed_pipeline_test_py3.py
+++ /dev/null
@@ -1,535 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for type-hint objects and decorators - Python 3 syntax specific.
-"""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-
-import typing
-import unittest
-
-import apache_beam as beam
-from apache_beam import typehints
-
-
-class MainInputTest(unittest.TestCase):
-  def assertStartswith(self, msg, prefix):
-    self.assertTrue(
-        msg.startswith(prefix), '"%s" does not start with "%s"' % (msg, 
prefix))
-
-  def test_typed_dofn_method(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, element: int) -> typehints.Tuple[str]:
-        return tuple(str(element))
-
-    result = [1, 2, 3] | beam.ParDo(MyDoFn())
-    self.assertEqual(['1', '2', '3'], sorted(result))
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*int.*got.*str'):
-      _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*int.*got.*str'):
-      _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
-
-  def test_typed_dofn_method_with_class_decorators(self):
-    # Class decorators take precedence over PEP 484 hints.
-    @typehints.with_input_types(typehints.Tuple[int, int])
-    @typehints.with_output_types(int)
-    class MyDoFn(beam.DoFn):
-      def process(self, element: int) -> typehints.Tuple[str]:
-        yield element[0]
-
-    result = [(1, 2)] | beam.ParDo(MyDoFn())
-    self.assertEqual([1], sorted(result))
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*Tuple\[int, int\].*got.*str'):
-      _ = ['a', 'b', 'c'] | beam.ParDo(MyDoFn())
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*Tuple\[int, int\].*got.*int'):
-      _ = [1, 2, 3] | (beam.ParDo(MyDoFn()) | 'again' >> beam.ParDo(MyDoFn()))
-
-  def test_typed_dofn_instance(self):
-    # Type hints applied to DoFn instance take precedence over decorators and
-    # process annotations.
-    @typehints.with_input_types(typehints.Tuple[int, int])
-    @typehints.with_output_types(int)
-    class MyDoFn(beam.DoFn):
-      def process(self, element: typehints.Tuple[int, int]) -> \
-          typehints.List[int]:
-        return [str(element)]
-
-    my_do_fn = MyDoFn().with_input_types(int).with_output_types(str)
-
-    result = [1, 2, 3] | beam.ParDo(my_do_fn)
-    self.assertEqual(['1', '2', '3'], sorted(result))
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*int.*got.*str'):
-      _ = ['a', 'b', 'c'] | beam.ParDo(my_do_fn)
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*int.*got.*str'):
-      _ = [1, 2, 3] | (beam.ParDo(my_do_fn) | 'again' >> beam.ParDo(my_do_fn))
-
-  def test_typed_callable_instance(self):
-    # Type hints applied to ParDo instance take precedence over callable
-    # decorators and annotations.
-    @typehints.with_input_types(typehints.Tuple[int, int])
-    @typehints.with_output_types(typehints.Generator[int])
-    def do_fn(element: typehints.Tuple[int, int]) -> typehints.Generator[str]:
-      yield str(element)
-
-    pardo = beam.ParDo(do_fn).with_input_types(int).with_output_types(str)
-
-    result = [1, 2, 3] | pardo
-    self.assertEqual(['1', '2', '3'], sorted(result))
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*int.*got.*str'):
-      _ = ['a', 'b', 'c'] | pardo
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*int.*got.*str'):
-      _ = [1, 2, 3] | (pardo | 'again' >> pardo)
-
-  def test_typed_callable_iterable_output(self):
-    # Only the outer Iterable should be stripped.
-    def do_fn(element: int) -> typehints.Iterable[typehints.Iterable[str]]:
-      return [[str(element)] * 2]
-
-    result = [1, 2] | beam.ParDo(do_fn)
-    self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
-
-  def test_typed_dofn_method_not_iterable(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, element: int) -> str:
-        return str(element)
-
-    with self.assertRaisesRegex(ValueError, r'str.*is not iterable'):
-      _ = [1, 2, 3] | beam.ParDo(MyDoFn())
-
-  def test_typed_dofn_method_return_none(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, unused_element: int) -> None:
-        pass
-
-    result = [1, 2, 3] | beam.ParDo(MyDoFn())
-    self.assertListEqual([], result)
-
-  def test_typed_dofn_method_return_optional(self):
-    class MyDoFn(beam.DoFn):
-      def process(
-          self,
-          unused_element: int) -> typehints.Optional[typehints.Iterable[int]]:
-        pass
-
-    result = [1, 2, 3] | beam.ParDo(MyDoFn())
-    self.assertListEqual([], result)
-
-  def test_typed_dofn_method_return_optional_not_iterable(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, unused_element: int) -> typehints.Optional[int]:
-        pass
-
-    with self.assertRaisesRegex(ValueError, r'int.*is not iterable'):
-      _ = [1, 2, 3] | beam.ParDo(MyDoFn())
-
-  def test_typed_callable_not_iterable(self):
-    def do_fn(element: int) -> int:
-      return element
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'int.*is not iterable'):
-      _ = [1, 2, 3] | beam.ParDo(do_fn)
-
-  def test_typed_dofn_kwonly(self):
-    class MyDoFn(beam.DoFn):
-      # TODO(BEAM-5878): A kwonly argument like
-      #   timestamp=beam.DoFn.TimestampParam would not work here.
-      def process(self, element: int, *, side_input: str) -> \
-          typehints.Generator[typehints.Optional[str]]:
-        yield str(element) if side_input else None
-
-    my_do_fn = MyDoFn()
-
-    result = [1, 2, 3] | beam.ParDo(my_do_fn, side_input='abc')
-    self.assertEqual(['1', '2', '3'], sorted(result))
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*str.*got.*int.*side_input'):
-      _ = [1, 2, 3] | beam.ParDo(my_do_fn, side_input=1)
-
-  def test_typed_dofn_var_kwargs(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, element: int, **side_inputs: typehints.Dict[str, str]) 
\
-          -> typehints.Generator[typehints.Optional[str]]:
-        yield str(element) if side_inputs else None
-
-    my_do_fn = MyDoFn()
-
-    result = [1, 2, 3] | beam.ParDo(my_do_fn, foo='abc', bar='def')
-    self.assertEqual(['1', '2', '3'], sorted(result))
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'requires.*str.*got.*int.*side_inputs'):
-      _ = [1, 2, 3] | beam.ParDo(my_do_fn, a=1)
-
-  def test_typed_callable_string_literals(self):
-    def do_fn(element: 'int') -> 'typehints.List[str]':
-      return [[str(element)] * 2]
-
-    result = [1, 2] | beam.ParDo(do_fn)
-    self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
-
-  def test_typed_ptransform_fn(self):
-    # Test that type hints are propagated to the created PTransform.
-    @beam.ptransform_fn
-    @typehints.with_input_types(int)
-    def MyMap(pcoll):
-      def fn(element: int):
-        yield element
-
-      return pcoll | beam.ParDo(fn)
-
-    self.assertListEqual([1, 2, 3], [1, 2, 3] | MyMap())
-    with self.assertRaisesRegex(typehints.TypeCheckError, r'int.*got.*str'):
-      _ = ['a'] | MyMap()
-
-  def test_typed_ptransform_fn_conflicting_hints(self):
-    # In this case, both MyMap and its contained ParDo have separate type
-    # checks (that disagree with each other).
-    @beam.ptransform_fn
-    @typehints.with_input_types(int)
-    def MyMap(pcoll):
-      def fn(element: float):
-        yield element
-
-      return pcoll | beam.ParDo(fn)
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'ParDo.*requires.*float.*got.*int'):
-      _ = [1, 2, 3] | MyMap()
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'MyMap.*expected.*int.*got.*str'):
-      _ = ['a'] | MyMap()
-
-  def test_typed_dofn_string_literals(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, element: 'int') -> 'typehints.List[str]':
-        return [[str(element)] * 2]
-
-    result = [1, 2] | beam.ParDo(MyDoFn())
-    self.assertEqual([['1', '1'], ['2', '2']], sorted(result))
-
-  def test_typed_map(self):
-    def fn(element: int) -> int:
-      return element * 2
-
-    result = [1, 2, 3] | beam.Map(fn)
-    self.assertEqual([2, 4, 6], sorted(result))
-
-  def test_typed_map_return_optional(self):
-    # None is a valid element value for Map.
-    def fn(element: int) -> typehints.Optional[int]:
-      if element > 1:
-        return element
-
-    result = [1, 2, 3] | beam.Map(fn)
-    self.assertCountEqual([None, 2, 3], result)
-
-  def test_typed_flatmap(self):
-    def fn(element: int) -> typehints.Iterable[int]:
-      yield element * 2
-
-    result = [1, 2, 3] | beam.FlatMap(fn)
-    self.assertCountEqual([2, 4, 6], result)
-
-  def test_typed_flatmap_output_hint_not_iterable(self):
-    def fn(element: int) -> int:
-      return element * 2
-
-    # This is raised (originally) in strip_iterable.
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'int.*is not iterable'):
-      _ = [1, 2, 3] | beam.FlatMap(fn)
-
-  def test_typed_flatmap_output_value_not_iterable(self):
-    def fn(element: int) -> typehints.Iterable[int]:
-      return element * 2
-
-    # This is raised in runners/common.py (process_outputs).
-    with self.assertRaisesRegex(TypeError, r'int.*is not iterable'):
-      _ = [1, 2, 3] | beam.FlatMap(fn)
-
-  def test_typed_flatmap_optional(self):
-    def fn(element: int) -> typehints.Optional[typehints.Iterable[int]]:
-      if element > 1:
-        yield element * 2
-
-    # Verify that the output type of fn is int and not Optional[int].
-    def fn2(element: int) -> int:
-      return element
-
-    result = [1, 2, 3] | beam.FlatMap(fn) | beam.Map(fn2)
-    self.assertCountEqual([4, 6], result)
-
-  def test_typed_ptransform_with_no_error(self):
-    class StrToInt(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
-        return pcoll | beam.Map(lambda x: int(x))
-
-    class IntToStr(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
-        return pcoll | beam.Map(lambda x: str(x))
-
-    _ = ['1', '2', '3'] | StrToInt() | IntToStr()
-
-  def test_typed_ptransform_with_bad_typehints(self):
-    class StrToInt(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
-        return pcoll | beam.Map(lambda x: int(x))
-
-    class IntToStr(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[str]:
-        return pcoll | beam.Map(lambda x: str(x))
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                "Input type hint violation at IntToStr: "
-                                "expected <class 'str'>, got <class 'int'>"):
-      _ = ['1', '2', '3'] | StrToInt() | IntToStr()
-
-  def test_typed_ptransform_with_bad_input(self):
-    class StrToInt(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[str]) -> beam.pvalue.PCollection[int]:
-        return pcoll | beam.Map(lambda x: int(x))
-
-    class IntToStr(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
-        return pcoll | beam.Map(lambda x: str(x))
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                "Input type hint violation at StrToInt: "
-                                "expected <class 'str'>, got <class 'int'>"):
-      # Feed integers to a PTransform that expects strings
-      _ = [1, 2, 3] | StrToInt() | IntToStr()
-
-  def test_typed_ptransform_with_partial_typehints(self):
-    class StrToInt(beam.PTransform):
-      def expand(self, pcoll) -> beam.pvalue.PCollection[int]:
-        return pcoll | beam.Map(lambda x: int(x))
-
-    class IntToStr(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
-        return pcoll | beam.Map(lambda x: str(x))
-
-    # Feed integers to a PTransform that should expect strings
-    # but has no typehints so it expects any
-    _ = [1, 2, 3] | StrToInt() | IntToStr()
-
-  def test_typed_ptransform_with_bare_wrappers(self):
-    class StrToInt(beam.PTransform):
-      def expand(
-          self, pcoll: beam.pvalue.PCollection) -> beam.pvalue.PCollection:
-        return pcoll | beam.Map(lambda x: int(x))
-
-    class IntToStr(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
-        return pcoll | beam.Map(lambda x: str(x))
-
-    _ = [1, 2, 3] | StrToInt() | IntToStr()
-
-  def test_typed_ptransform_with_no_typehints(self):
-    class StrToInt(beam.PTransform):
-      def expand(self, pcoll):
-        return pcoll | beam.Map(lambda x: int(x))
-
-    class IntToStr(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[int]) -> beam.pvalue.PCollection[str]:
-        return pcoll | beam.Map(lambda x: str(x))
-
-    # Feed integers to a PTransform that should expect strings
-    # but has no typehints so it expects any
-    _ = [1, 2, 3] | StrToInt() | IntToStr()
-
-  def test_typed_ptransform_with_generic_annotations(self):
-    T = typing.TypeVar('T')
-
-    class IntToInt(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[T]:
-        return pcoll | beam.Map(lambda x: x)
-
-    class IntToStr(beam.PTransform):
-      def expand(
-          self,
-          pcoll: beam.pvalue.PCollection[T]) -> beam.pvalue.PCollection[str]:
-        return pcoll | beam.Map(lambda x: str(x))
-
-    _ = [1, 2, 3] | IntToInt() | IntToStr()
-
-  def test_typed_ptransform_with_do_outputs_tuple_compiles(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, element: int, *args, **kwargs):
-        if element % 2:
-          yield beam.pvalue.TaggedOutput('odd', 1)
-        else:
-          yield beam.pvalue.TaggedOutput('even', 1)
-
-    class MyPTransform(beam.PTransform):
-      def expand(self, pcoll: beam.pvalue.PCollection[int]):
-        return pcoll | beam.ParDo(MyDoFn()).with_outputs('odd', 'even')
-
-    # This test fails if you remove the following line from ptransform.py
-    # if isinstance(pvalue_, DoOutputsTuple): continue
-    _ = [1, 2, 3] | MyPTransform()
-
-
-class AnnotationsTest(unittest.TestCase):
-  def test_pardo_dofn(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, element: int) -> typehints.Generator[str]:
-        yield str(element)
-
-    th = beam.ParDo(MyDoFn()).get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((str, ), {}))
-
-  def test_pardo_dofn_not_iterable(self):
-    class MyDoFn(beam.DoFn):
-      def process(self, element: int) -> str:
-        return str(element)
-
-    with self.assertRaisesRegex(ValueError, r'str.*is not iterable'):
-      _ = beam.ParDo(MyDoFn()).get_type_hints()
-
-  def test_pardo_wrapper(self):
-    def do_fn(element: int) -> typehints.Iterable[str]:
-      return [str(element)]
-
-    th = beam.ParDo(do_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((str, ), {}))
-
-  def test_pardo_wrapper_tuple(self):
-    # Test case for callables that return key-value pairs for GBK. The outer
-    # Iterable should be stripped but the inner Tuple left intact.
-    def do_fn(element: int) -> typehints.Iterable[typehints.Tuple[str, int]]:
-      return [(str(element), element)]
-
-    th = beam.ParDo(do_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((typehints.Tuple[str, int], ), {}))
-
-  def test_pardo_wrapper_not_iterable(self):
-    def do_fn(element: int) -> str:
-      return str(element)
-
-    with self.assertRaisesRegex(typehints.TypeCheckError,
-                                r'str.*is not iterable'):
-      _ = beam.ParDo(do_fn).get_type_hints()
-
-  def test_flat_map_wrapper(self):
-    def map_fn(element: int) -> typehints.Iterable[int]:
-      return [element, element + 1]
-
-    th = beam.FlatMap(map_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((int, ), {}))
-
-  def test_flat_map_wrapper_optional_output(self):
-    # Optional should not affect output type (Nones are ignored).
-    def map_fn(element: int) -> typehints.Optional[typehints.Iterable[int]]:
-      return [element, element + 1]
-
-    th = beam.FlatMap(map_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((int, ), {}))
-
-  @unittest.skip('BEAM-8662: Py3 annotations not yet supported for MapTuple')
-  def test_flat_map_tuple_wrapper(self):
-    # TODO(BEAM-8662): Also test with a fn that accepts default arguments.
-    def tuple_map_fn(a: str, b: str, c: str) -> typehints.Iterable[str]:
-      return [a, b, c]
-
-    th = beam.FlatMapTuple(tuple_map_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((str, str, str), {}))
-    self.assertEqual(th.output_types, ((str, ), {}))
-
-  def test_map_wrapper(self):
-    def map_fn(unused_element: int) -> int:
-      return 1
-
-    th = beam.Map(map_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((int, ), {}))
-
-  def test_map_wrapper_optional_output(self):
-    # Optional does affect output type (Nones are NOT ignored).
-    def map_fn(unused_element: int) -> typehints.Optional[int]:
-      return 1
-
-    th = beam.Map(map_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((typehints.Optional[int], ), {}))
-
-  @unittest.skip('BEAM-8662: Py3 annotations not yet supported for MapTuple')
-  def test_map_tuple(self):
-    # TODO(BEAM-8662): Also test with a fn that accepts default arguments.
-    def tuple_map_fn(a: str, b: str, c: str) -> str:
-      return a + b + c
-
-    th = beam.MapTuple(tuple_map_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((str, str, str), {}))
-    self.assertEqual(th.output_types, ((str, ), {}))
-
-  def test_filter_wrapper(self):
-    def filter_fn(element: int) -> bool:
-      return bool(element % 2)
-
-    th = beam.Filter(filter_fn).get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((int, ), {}))
-
-
-if __name__ == '__main__':
-  unittest.main()
diff --git a/sdks/python/apache_beam/typehints/typehints.py 
b/sdks/python/apache_beam/typehints/typehints.py
index 615a847..3ff8224 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -65,16 +65,10 @@ In addition, type-hints can be used to implement run-time 
type-checking via the
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import collections
 import copy
 import logging
 import typing
-from builtins import next
-from builtins import zip
-
-from future.utils import with_metaclass
 
 __all__ = [
     'Any',
@@ -1038,8 +1032,7 @@ class IteratorHint(CompositeTypeHint):
 IteratorTypeConstraint = IteratorHint.IteratorTypeConstraint
 
 
-class WindowedTypeConstraint(with_metaclass(GetitemConstructor, TypeConstraint)
-                             ):  # type: ignore[misc]
+class WindowedTypeConstraint(TypeConstraint, metaclass=GetitemConstructor):
   """A type constraint for WindowedValue objects.
 
   Mostly for internal use.
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py 
b/sdks/python/apache_beam/typehints/typehints_test.py
index 4ff3040..a469175 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -19,19 +19,21 @@
 
 # pytype: skip-file
 
-from __future__ import absolute_import
-
 import functools
 import sys
+import typing
 import unittest
-from builtins import next
-from builtins import range
-
-# patches unittest.TestCase to be python3 compatible
-import future.tests.base  # pylint: disable=unused-import
 
 import apache_beam.typehints.typehints as typehints
+from apache_beam import Map
+from apache_beam import PTransform
+from apache_beam.pvalue import PBegin
+from apache_beam.pvalue import PCollection
+from apache_beam.pvalue import PDone
+from apache_beam.transforms.core import DoFn
+from apache_beam.typehints import KV
 from apache_beam.typehints import Any
+from apache_beam.typehints import Iterable
 from apache_beam.typehints import Tuple
 from apache_beam.typehints import TypeCheckError
 from apache_beam.typehints import Union
@@ -1283,5 +1285,237 @@ class TestCoerceToKvType(TypeHintTestCase):
         typehints.coerce_to_kv_type(*args)
 
 
+class TestParDoAnnotations(unittest.TestCase):
+  def test_with_side_input(self):
+    class MyDoFn(DoFn):
+      def process(self, element: float, side_input: str) -> \
+          Iterable[KV[str, float]]:
+        pass
+
+    th = MyDoFn().get_type_hints()
+    self.assertEqual(th.input_types, ((float, str), {}))
+    self.assertEqual(th.output_types, ((KV[str, float], ), {}))
+
+  def test_pep484_annotations(self):
+    class MyDoFn(DoFn):
+      def process(self, element: int) -> Iterable[str]:
+        pass
+
+    th = MyDoFn().get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+
+class TestPTransformAnnotations(unittest.TestCase):
+  def test_pep484_annotations(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_without_input_pcollection_wrapper(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: int) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    error_str = (
+        r'This input type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, input type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, or PBegin. Got: {} instead.'.format(int))
+
+    with self.assertLogs(level='WARN') as log:
+      MyPTransform().get_type_hints()
+      self.assertIn(error_str, log.output[0])
+
+  def test_annotations_without_output_pcollection_wrapper(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection[int]) -> str:
+        return pcoll | Map(lambda num: str(num))
+
+    error_str = (
+        r'This output type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, output type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, PDone, or None. Got: {} instead.'.format(str))
+
+    with self.assertLogs(level='WARN') as log:
+      th = MyPTransform().get_type_hints()
+      self.assertIn(error_str, log.output[0])
+      self.assertEqual(th.input_types, ((int, ), {}))
+      self.assertEqual(th.output_types, None)
+
+  def test_annotations_without_input_internal_type(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_without_output_internal_type(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection[int]) -> PCollection:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_without_any_internal_type(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection) -> PCollection:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_without_input_typehint(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_without_output_typehint(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PCollection[int]):
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((int, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_without_any_typehints(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll):
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, None)
+    self.assertEqual(th.output_types, None)
+
+  def test_annotations_with_pbegin(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: PBegin):
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_with_pdone(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll) -> PDone:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_with_none_input(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: None) -> PCollection[str]:
+        return pcoll | Map(lambda num: str(num))
+
+    error_str = (
+        r'This input type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, input type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, or PBegin. Got: {} instead.'.format(None))
+
+    with self.assertLogs(level='WARN') as log:
+      th = MyPTransform().get_type_hints()
+      self.assertIn(error_str, log.output[0])
+      self.assertEqual(th.input_types, None)
+      self.assertEqual(th.output_types, ((str, ), {}))
+
+  def test_annotations_with_none_output(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll) -> None:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, ((Any, ), {}))
+
+  def test_annotations_with_arbitrary_output(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll) -> str:
+        return pcoll | Map(lambda num: str(num))
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((Any, ), {}))
+    self.assertEqual(th.output_types, None)
+
+  def test_annotations_with_arbitrary_input_and_output(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: int) -> str:
+        return pcoll | Map(lambda num: str(num))
+
+    input_error_str = (
+        r'This input type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, input type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, or PBegin. Got: {} instead.'.format(int))
+
+    output_error_str = (
+        r'This output type hint will be ignored and not used for '
+        r'type-checking purposes. Typically, output type hints for a '
+        r'PTransform are single (or nested) types wrapped by a '
+        r'PCollection, PDone, or None. Got: {} instead.'.format(str))
+
+    with self.assertLogs(level='WARN') as log:
+      th = MyPTransform().get_type_hints()
+      self.assertIn(input_error_str, log.output[0])
+      self.assertIn(output_error_str, log.output[1])
+      self.assertEqual(th.input_types, None)
+      self.assertEqual(th.output_types, None)
+
+  def test_typing_module_annotations_are_converted_to_beam_annotations(self):
+    class MyPTransform(PTransform):
+      def expand(
+          self, pcoll: PCollection[typing.Dict[str, str]]
+      ) -> PCollection[typing.Dict[str, str]]:
+        return pcoll
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
+    self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
+
+  def test_nested_typing_annotations_are_converted_to_beam_annotations(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll:
+         PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]) \
+      -> PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]:
+        return pcoll
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(
+        th.input_types,
+        ((typehints.Union[int, typehints.Any, typehints.Dict[str,
+                                                             float]], ), {}))
+    self.assertEqual(
+        th.input_types,
+        ((typehints.Union[int, typehints.Any, typehints.Dict[str,
+                                                             float]], ), {}))
+
+  def test_mixed_annotations_are_converted_to_beam_annotations(self):
+    class MyPTransform(PTransform):
+      def expand(self, pcoll: typing.Any) -> typehints.Any:
+        return pcoll
+
+    th = MyPTransform().get_type_hints()
+    self.assertEqual(th.input_types, ((typehints.Any, ), {}))
+    self.assertEqual(th.input_types, ((typehints.Any, ), {}))
+
+
 if __name__ == '__main__':
   unittest.main()
diff --git a/sdks/python/apache_beam/typehints/typehints_test_py3.py 
b/sdks/python/apache_beam/typehints/typehints_test_py3.py
deleted file mode 100644
index 5a36330..0000000
--- a/sdks/python/apache_beam/typehints/typehints_test_py3.py
+++ /dev/null
@@ -1,274 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the type-hint objects and decorators with Python 3 syntax not
-supported by 2.7."""
-
-# pytype: skip-file
-
-from __future__ import absolute_import
-from __future__ import print_function
-
-import typing
-import unittest
-
-import apache_beam.typehints.typehints as typehints
-from apache_beam import Map
-from apache_beam import PTransform
-from apache_beam.pvalue import PBegin
-from apache_beam.pvalue import PCollection
-from apache_beam.pvalue import PDone
-from apache_beam.transforms.core import DoFn
-from apache_beam.typehints import KV
-from apache_beam.typehints import Iterable
-from apache_beam.typehints.typehints import Any
-
-
-class TestParDoAnnotations(unittest.TestCase):
-  def test_with_side_input(self):
-    class MyDoFn(DoFn):
-      def process(self, element: float, side_input: str) -> \
-          Iterable[KV[str, float]]:
-        pass
-
-    th = MyDoFn().get_type_hints()
-    self.assertEqual(th.input_types, ((float, str), {}))
-    self.assertEqual(th.output_types, ((KV[str, float], ), {}))
-
-  def test_pep484_annotations(self):
-    class MyDoFn(DoFn):
-      def process(self, element: int) -> Iterable[str]:
-        pass
-
-    th = MyDoFn().get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((str, ), {}))
-
-
-class TestPTransformAnnotations(unittest.TestCase):
-  def test_pep484_annotations(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: PCollection[int]) -> PCollection[str]:
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((str, ), {}))
-
-  def test_annotations_without_input_pcollection_wrapper(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: int) -> PCollection[str]:
-        return pcoll | Map(lambda num: str(num))
-
-    error_str = (
-        r'This input type hint will be ignored and not used for '
-        r'type-checking purposes. Typically, input type hints for a '
-        r'PTransform are single (or nested) types wrapped by a '
-        r'PCollection, or PBegin. Got: {} instead.'.format(int))
-
-    with self.assertLogs(level='WARN') as log:
-      MyPTransform().get_type_hints()
-      self.assertIn(error_str, log.output[0])
-
-  def test_annotations_without_output_pcollection_wrapper(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: PCollection[int]) -> str:
-        return pcoll | Map(lambda num: str(num))
-
-    error_str = (
-        r'This output type hint will be ignored and not used for '
-        r'type-checking purposes. Typically, output type hints for a '
-        r'PTransform are single (or nested) types wrapped by a '
-        r'PCollection, PDone, or None. Got: {} instead.'.format(str))
-
-    with self.assertLogs(level='WARN') as log:
-      th = MyPTransform().get_type_hints()
-      self.assertIn(error_str, log.output[0])
-      self.assertEqual(th.input_types, ((int, ), {}))
-      self.assertEqual(th.output_types, None)
-
-  def test_annotations_without_input_internal_type(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: PCollection) -> PCollection[str]:
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((Any, ), {}))
-    self.assertEqual(th.output_types, ((str, ), {}))
-
-  def test_annotations_without_output_internal_type(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: PCollection[int]) -> PCollection:
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((Any, ), {}))
-
-  def test_annotations_without_any_internal_type(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: PCollection) -> PCollection:
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((Any, ), {}))
-    self.assertEqual(th.output_types, ((Any, ), {}))
-
-  def test_annotations_without_input_typehint(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll) -> PCollection[str]:
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((Any, ), {}))
-    self.assertEqual(th.output_types, ((str, ), {}))
-
-  def test_annotations_without_output_typehint(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: PCollection[int]):
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((int, ), {}))
-    self.assertEqual(th.output_types, ((Any, ), {}))
-
-  def test_annotations_without_any_typehints(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll):
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, None)
-    self.assertEqual(th.output_types, None)
-
-  def test_annotations_with_pbegin(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: PBegin):
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((Any, ), {}))
-    self.assertEqual(th.output_types, ((Any, ), {}))
-
-  def test_annotations_with_pdone(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll) -> PDone:
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((Any, ), {}))
-    self.assertEqual(th.output_types, ((Any, ), {}))
-
-  def test_annotations_with_none_input(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: None) -> PCollection[str]:
-        return pcoll | Map(lambda num: str(num))
-
-    error_str = (
-        r'This input type hint will be ignored and not used for '
-        r'type-checking purposes. Typically, input type hints for a '
-        r'PTransform are single (or nested) types wrapped by a '
-        r'PCollection, or PBegin. Got: {} instead.'.format(None))
-
-    with self.assertLogs(level='WARN') as log:
-      th = MyPTransform().get_type_hints()
-      self.assertIn(error_str, log.output[0])
-      self.assertEqual(th.input_types, None)
-      self.assertEqual(th.output_types, ((str, ), {}))
-
-  def test_annotations_with_none_output(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll) -> None:
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((Any, ), {}))
-    self.assertEqual(th.output_types, ((Any, ), {}))
-
-  def test_annotations_with_arbitrary_output(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll) -> str:
-        return pcoll | Map(lambda num: str(num))
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((Any, ), {}))
-    self.assertEqual(th.output_types, None)
-
-  def test_annotations_with_arbitrary_input_and_output(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: int) -> str:
-        return pcoll | Map(lambda num: str(num))
-
-    input_error_str = (
-        r'This input type hint will be ignored and not used for '
-        r'type-checking purposes. Typically, input type hints for a '
-        r'PTransform are single (or nested) types wrapped by a '
-        r'PCollection, or PBegin. Got: {} instead.'.format(int))
-
-    output_error_str = (
-        r'This output type hint will be ignored and not used for '
-        r'type-checking purposes. Typically, output type hints for a '
-        r'PTransform are single (or nested) types wrapped by a '
-        r'PCollection, PDone, or None. Got: {} instead.'.format(str))
-
-    with self.assertLogs(level='WARN') as log:
-      th = MyPTransform().get_type_hints()
-      self.assertIn(input_error_str, log.output[0])
-      self.assertIn(output_error_str, log.output[1])
-      self.assertEqual(th.input_types, None)
-      self.assertEqual(th.output_types, None)
-
-  def test_typing_module_annotations_are_converted_to_beam_annotations(self):
-    class MyPTransform(PTransform):
-      def expand(
-          self, pcoll: PCollection[typing.Dict[str, str]]
-      ) -> PCollection[typing.Dict[str, str]]:
-        return pcoll
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
-    self.assertEqual(th.input_types, ((typehints.Dict[str, str], ), {}))
-
-  def test_nested_typing_annotations_are_converted_to_beam_annotations(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll:
-         PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]) \
-      -> PCollection[typing.Union[int, typing.Any, typing.Dict[str, float]]]:
-        return pcoll
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(
-        th.input_types,
-        ((typehints.Union[int, typehints.Any, typehints.Dict[str,
-                                                             float]], ), {}))
-    self.assertEqual(
-        th.input_types,
-        ((typehints.Union[int, typehints.Any, typehints.Dict[str,
-                                                             float]], ), {}))
-
-  def test_mixed_annotations_are_converted_to_beam_annotations(self):
-    class MyPTransform(PTransform):
-      def expand(self, pcoll: typing.Any) -> typehints.Any:
-        return pcoll
-
-    th = MyPTransform().get_type_hints()
-    self.assertEqual(th.input_types, ((typehints.Any, ), {}))
-    self.assertEqual(th.input_types, ((typehints.Any, ), {}))
-
-
-if __name__ == '__main__':
-  unittest.main()

Reply via email to