claudevdm commented on code in PR #37434:
URL: https://github.com/apache/beam/pull/37434#discussion_r2738351073
##########
sdks/python/apache_beam/typehints/decorators.py:
##########
@@ -182,6 +187,140 @@ def disable_type_annotations():
TRACEBACK_LIMIT = 5
+def _tag_and_type(t):
+ """Extract tag name and value type from TaggedOutput[Literal['tag'], Type].
+
+ Returns raw Python types - conversion to beam types happens in
+ _extract_output_types.
+ """
+ args = get_args(t)
+ if len(args) != 2:
+ raise TypeError(
+ f"TaggedOutput expects 2 type parameters, got {len(args)}: {t}")
+
+ literal_type, value_type = args
+
+ if get_origin(literal_type) is not Literal:
+ raise TypeError(
+ f"First type parameter of TaggedOutput must be Literal['tag_name'], "
+ f"got {literal_type}. Example: TaggedOutput[Literal['errors'], str]")
+
+ tag_string = get_args(literal_type)[0]
+ return tag_string, value_type
+
+
+def _contains_tagged_output(t):
+ """Check if type contains TaggedOutput at a meaningful position.
+
+ TaggedOutput only makes sense in these patterns:
+ - TaggedOutput[...]
+ - X | TaggedOutput[...]
+ - Iterable[TaggedOutput[...]]
+ - Iterable[X | TaggedOutput[...]]
+ """
+ def _is_tagged(typ):
+ return get_origin(typ) is TaggedOutput or typ is TaggedOutput
+
+ # TaggedOutput[...]
+ if _is_tagged(t):
+ return True
+
+ origin = get_origin(t)
+ args = get_args(t)
+
+ # X | TaggedOutput[...]
+ if origin is Union:
+ return any(_is_tagged(arg) for arg in args)
+
+ # Iterable[...]
+ if origin is collections.abc.Iterable and len(args) == 1:
+ inner = args[0]
+ # Iterable[TaggedOutput[...]]
+ if _is_tagged(inner):
+ return True
+ # Iterable[X | TaggedOutput[...]]
+ if get_origin(inner) is Union:
+ return any(_is_tagged(arg) for arg in get_args(inner))
+
+ return False
+
+
+def _extract_main_and_tagged(t):
+ """Extract main type and tagged types from a type annotation.
+
+ Returns:
+ (main_type, tagged_dict) where main_type is the type without TaggedOutput
+ annotations (or None if no main type), and tagged_dict maps tag names to
+ their types.
+ """
+ if get_origin(t) is TaggedOutput:
+ tag, typ = _tag_and_type(t)
+ return None, {tag: typ}
+
+ if t is TaggedOutput:
+ raise TypeError(
Review Comment:
Actually, should this fail or just warn? I saw an issue #26493 where someone
is using typehint -> Iterable[Union[Dict[str, Any], pvalue.TaggedOutput]]:
Their pipeline will break until they fix the typehint. Maybe we should
rather warn that the tagged typehints cannot be parsed instead of throwing
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]