claudevdm commented on code in PR #37434:
URL: https://github.com/apache/beam/pull/37434#discussion_r2743904156


##########
sdks/python/apache_beam/typehints/decorators.py:
##########
@@ -182,6 +187,140 @@ def disable_type_annotations():
 TRACEBACK_LIMIT = 5
 
 
+def _tag_and_type(t):
+  """Extract tag name and value type from TaggedOutput[Literal['tag'], Type].
+
+  Returns raw Python types - conversion to beam types happens in
+  _extract_output_types.
+  """
+  args = get_args(t)
+  if len(args) != 2:
+    raise TypeError(
+        f"TaggedOutput expects 2 type parameters, got {len(args)}: {t}")
+
+  literal_type, value_type = args
+
+  if get_origin(literal_type) is not Literal:
+    raise TypeError(
+        f"First type parameter of TaggedOutput must be Literal['tag_name'], "
+        f"got {literal_type}. Example: TaggedOutput[Literal['errors'], str]")
+
+  tag_string = get_args(literal_type)[0]
+  return tag_string, value_type
+
+
+def _contains_tagged_output(t):
+  """Check if type contains TaggedOutput at a meaningful position.
+
+  TaggedOutput only makes sense in these patterns:
+  - TaggedOutput[...]
+  - X | TaggedOutput[...]
+  - Iterable[TaggedOutput[...]]
+  - Iterable[X | TaggedOutput[...]]
+  """
+  def _is_tagged(typ):
+    return get_origin(typ) is TaggedOutput or typ is TaggedOutput
+
+  # TaggedOutput[...]
+  if _is_tagged(t):
+    return True
+
+  origin = get_origin(t)
+  args = get_args(t)
+
+  # X | TaggedOutput[...]
+  if origin is Union:
+    return any(_is_tagged(arg) for arg in args)
+
+  # Iterable[...]
+  if origin is collections.abc.Iterable and len(args) == 1:
+    inner = args[0]
+    # Iterable[TaggedOutput[...]]
+    if _is_tagged(inner):
+      return True
+    # Iterable[X | TaggedOutput[...]]
+    if get_origin(inner) is Union:
+      return any(_is_tagged(arg) for arg in get_args(inner))
+
+  return False
+
+
+def _extract_main_and_tagged(t):
+  """Extract main type and tagged types from a type annotation.
+
+  Returns:
+    (main_type, tagged_dict) where main_type is the type without TaggedOutput
+    annotations (or None if no main type), and tagged_dict maps tag names to
+    their types.
+  """
+  if get_origin(t) is TaggedOutput:
+    tag, typ = _tag_and_type(t)
+    return None, {tag: typ}
+
+  if t is TaggedOutput:
+    raise TypeError(
+        "TaggedOutput in return type must include type parameters: "
+        "TaggedOutput[Literal['tag_name'], ValueType]")
+
+  if get_origin(t) is not Union:
+    return t, {}
+
+  main_types = []
+  tagged_types = {}
+  for arg in get_args(t):
+    if get_origin(arg) is TaggedOutput:
+      tag, typ = _tag_and_type(arg)
+      tagged_types[tag] = typ
+    elif arg is TaggedOutput:
+      raise TypeError(
+          "TaggedOutput in return type must include type parameters: "
+          "TaggedOutput[Literal['tag_name'], ValueType]")
+    else:
+      main_types.append(arg)
+
+  if len(main_types) == 0:
+    main_type = None
+  elif len(main_types) == 1:
+    main_type = main_types[0]
+  else:
+    main_type = Union[tuple(main_types)]
+
+  return main_type, tagged_types
+
+
+def _extract_output_types(return_annotation):
+  """Parse return annotation into (main_types, tagged_types).
+
+  For tagged outputs to be extracted from generator/iterator functions,
+  users must explicitly use Iterable[T | TaggedOutput[...]] as return type.
+
+  Returns raw Python types. Conversion to beam types happens in from_callable.
+  """
+  if return_annotation == inspect.Signature.empty:
+    return [Any], {}
+
+  # Early return if no TaggedOutput
+  if not _contains_tagged_output(return_annotation):

Review Comment:
   I added this to be extra conservative about not breaking anything. 
   
   Removed it an test failed where DoFn process -> None so fixed that case with 
a sentinal rather than None. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to