claudevdm commented on code in PR #37434:
URL: https://github.com/apache/beam/pull/37434#discussion_r2743883530
##########
sdks/python/apache_beam/typehints/decorators.py:
##########
@@ -182,6 +187,140 @@ def disable_type_annotations():
TRACEBACK_LIMIT = 5
+def _tag_and_type(t):
+ """Extract tag name and value type from TaggedOutput[Literal['tag'], Type].
+
+ Returns raw Python types - conversion to beam types happens in
+ _extract_output_types.
+ """
+ args = get_args(t)
+ if len(args) != 2:
+ raise TypeError(
+ f"TaggedOutput expects 2 type parameters, got {len(args)}: {t}")
+
+ literal_type, value_type = args
+
+ if get_origin(literal_type) is not Literal:
+ raise TypeError(
+ f"First type parameter of TaggedOutput must be Literal['tag_name'], "
+ f"got {literal_type}. Example: TaggedOutput[Literal['errors'], str]")
+
+ tag_string = get_args(literal_type)[0]
+ return tag_string, value_type
+
+
+def _contains_tagged_output(t):
+ """Check if type contains TaggedOutput at a meaningful position.
+
+ TaggedOutput only makes sense in these patterns:
+ - TaggedOutput[...]
+ - X | TaggedOutput[...]
+ - Iterable[TaggedOutput[...]]
+ - Iterable[X | TaggedOutput[...]]
+ """
+ def _is_tagged(typ):
+ return get_origin(typ) is TaggedOutput or typ is TaggedOutput
+
+ # TaggedOutput[...]
+ if _is_tagged(t):
+ return True
+
+ origin = get_origin(t)
+ args = get_args(t)
+
+ # X | TaggedOutput[...]
+ if origin is Union:
+ return any(_is_tagged(arg) for arg in args)
+
+ # Iterable[...]
+ if origin is collections.abc.Iterable and len(args) == 1:
+ inner = args[0]
+ # Iterable[TaggedOutput[...]]
+ if _is_tagged(inner):
+ return True
+ # Iterable[X | TaggedOutput[...]]
+ if get_origin(inner) is Union:
+ return any(_is_tagged(arg) for arg in get_args(inner))
+
+ return False
+
+
+def _extract_main_and_tagged(t):
+ """Extract main type and tagged types from a type annotation.
+
+ Returns:
+ (main_type, tagged_dict) where main_type is the type without TaggedOutput
+ annotations (or None if no main type), and tagged_dict maps tag names to
+ their types.
+ """
+ if get_origin(t) is TaggedOutput:
+ tag, typ = _tag_and_type(t)
+ return None, {tag: typ}
+
+ if t is TaggedOutput:
+ raise TypeError(
Review Comment:
Do you think an update compat flag is necessary? If the coder changed from
FastPrimitivesCoder to IntCoder, then updating a pipeline will fail. The user
will just have to change their typehints to -> Any and then the update should
work.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]