This is an automated email from the ASF dual-hosted git repository.
jrmccluskey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new b9d48fa1750 Fix type checking failure with Python 3.10+ union pipe
syntax (int | None) (#37645)
b9d48fa1750 is described below
commit b9d48fa1750f6427cb86fa1de9d52e4d1849a433
Author: M Junaid Shaukat <[email protected]>
AuthorDate: Fri Feb 20 00:47:01 2026 +0500
Fix type checking failure with Python 3.10+ union pipe syntax (int | None)
(#37645)
* Fix type checking failure with Python 3.10+ union pipe syntax (int | None)
The normalize() function did not handle types.UnionType, causing
is_consistent_with() to fall through to issubclass() with a non-class
argument. Route types.UnionType through convert_to_beam_type() which
already knows how to convert it to Beam's UnionConstraint.
Fixes #36592
* Fix yapf formatting in test file
* Address review: merge UnionType check into existing elif, remove issue
link comments
---
sdks/python/apache_beam/typehints/typehints.py | 7 ++++---
.../python/apache_beam/typehints/typehints_test.py | 24 ++++++++++++++++++++++
2 files changed, 28 insertions(+), 3 deletions(-)
diff --git a/sdks/python/apache_beam/typehints/typehints.py
b/sdks/python/apache_beam/typehints/typehints.py
index d0dfaec23af..f429935c3a0 100644
--- a/sdks/python/apache_beam/typehints/typehints.py
+++ b/sdks/python/apache_beam/typehints/typehints.py
@@ -1462,9 +1462,10 @@ def normalize(x, none_as_type=False):
# Convert bare builtin types to correct type hints directly
elif x in _KNOWN_PRIMITIVE_TYPES:
return _KNOWN_PRIMITIVE_TYPES[x]
- elif getattr(x, '__module__',
- None) in ('typing', 'collections', 'collections.abc') or
getattr(
- x, '__origin__', None) in _KNOWN_PRIMITIVE_TYPES:
+ elif isinstance(x, types.UnionType) or getattr(
+ x, '__module__',
+ None) in ('typing', 'collections', 'collections.abc') or getattr(
+ x, '__origin__', None) in _KNOWN_PRIMITIVE_TYPES:
beam_type = native_type_compatibility.convert_to_beam_type(x)
if beam_type != x:
# We were able to do the conversion.
diff --git a/sdks/python/apache_beam/typehints/typehints_test.py
b/sdks/python/apache_beam/typehints/typehints_test.py
index cec83038008..1377bea6d56 100644
--- a/sdks/python/apache_beam/typehints/typehints_test.py
+++ b/sdks/python/apache_beam/typehints/typehints_test.py
@@ -1596,6 +1596,22 @@ class DecoratorHelpers(TypeHintTestCase):
self.assertFalse(is_consistent_with(Union[str, int], str))
self.assertFalse(is_consistent_with(str, NonBuiltInGeneric[str]))
+ def test_hint_helper_pipe_union(self):
+ pipe_union = int | None # pylint: disable=unsupported-binary-operation
+ typing_union = Union[int, None]
+ self.assertTrue(is_consistent_with(int, pipe_union))
+ self.assertTrue(is_consistent_with(type(None), pipe_union))
+ self.assertFalse(is_consistent_with(str, pipe_union))
+ self.assertTrue(
+ is_consistent_with(int, pipe_union) == is_consistent_with(
+ int, typing_union))
+ self.assertTrue(
+ is_consistent_with(str, pipe_union) == is_consistent_with(
+ str, typing_union))
+ pipe_union_2 = int | float # pylint: disable=unsupported-binary-operation
+ self.assertTrue(is_consistent_with(int, pipe_union_2))
+ self.assertTrue(is_consistent_with(float, pipe_union_2))
+
def test_positional_arg_hints(self):
self.assertEqual(typehints.Any, _positional_arg_hints('x', {}))
self.assertEqual(int, _positional_arg_hints('x', {'x': int}))
@@ -1934,6 +1950,14 @@ class TestPTransformAnnotations(unittest.TestCase):
native_type_compatibility.convert_to_beam_type(type_a),
native_type_compatibility.convert_to_beam_type(type_b))
+ def test_normalize_pipe_union(self):
+ pipe_union = int | None # pylint: disable=unsupported-binary-operation
+ normalized = typehints.normalize(pipe_union)
+ self.assertIsInstance(normalized, typehints.UnionConstraint)
+ pipe_union_2 = int | float # pylint: disable=unsupported-binary-operation
+ normalized_2 = typehints.normalize(pipe_union_2)
+ self.assertIsInstance(normalized_2, typehints.UnionConstraint)
+
class TestNonBuiltInGenerics(unittest.TestCase):
def test_no_error_thrown(self):