liferoad commented on code in PR #35672: URL: https://github.com/apache/beam/pull/35672#discussion_r2233128366
########## sdks/python/apache_beam/yaml/yaml_provider.py: ########## @@ -932,6 +932,129 @@ def __init__(self): # pylint: disable=useless-parent-delegation super().__init__() + def _unify_field_types(self, existing_type, field_type): + """Unify two field types, handling Optional and List types.""" + # Extract inner types from Optional if needed + existing_inner = ( + existing_type.__args__[0] if hasattr(existing_type, '__args__') and + len(existing_type.__args__) == 1 else existing_type) + field_inner = ( + field_type.__args__[0] if hasattr(field_type, '__args__') and + len(field_type.__args__) == 1 else field_type) + + # Handle type unification more carefully + if existing_inner == Any or field_inner == Any: + return Optional[Any] + elif existing_inner == field_inner: + return Optional[existing_inner] + else: + # Check for list types and prioritize them over other types + from apache_beam.typehints import typehints as th + existing_is_list = ( + hasattr(existing_inner, '__origin__') and + existing_inner.__origin__ in (list, th.List)) + field_is_list = ( + hasattr(field_inner, '__origin__') and + field_inner.__origin__ in (list, th.List)) + + if existing_is_list and field_is_list: + # Both are list types, unify their element types + existing_elem = existing_inner.__args__[ + 0] if existing_inner.__args__ else Any + field_elem = field_inner.__args__[0] if field_inner.__args__ else Any + if existing_elem == field_elem: + return Optional[th.List[existing_elem]] + else: + return Optional[th.List[Any]] + elif existing_is_list: + # Existing is list, keep it as list type + return Optional[existing_inner] + elif field_is_list: + # New field is list, use list type + return Optional[field_inner] + else: + # Neither is a list, use Any to avoid unsupported Union + # types in schema translation + return Optional[Any] + + def _merge_schemas(self, pcolls): + """Merge schemas from multiple PCollections to create a unified schema. + + This function creates a unified schema that contains all fields from all + input PCollections. Fields are made optional to handle missing values. + """ + from apache_beam.typehints.schemas import named_fields_from_element_type + + # Collect all schemas + schemas = [] + for pcoll in pcolls: + if hasattr(pcoll, 'element_type') and pcoll.element_type: + try: + fields = named_fields_from_element_type(pcoll.element_type) + schemas.append(dict(fields)) + except (ValueError, TypeError): + # If we can't extract schema, skip this PCollection + continue + + if not schemas: + return None + + # Merge all field names and types, making them optional + all_fields = {} + for schema in schemas: Review Comment: `_unify_field_types` for now at least does not use `Union`. Whenever two types are different, it uses `Optional[Any]`. I have some bit concerns about how accurate we need to infer the schema (e.g., stop at the list level like what you suggest or just do the simplest one my PR uses). I also think we should support specifying the schema and then it will make no sense for us to unify the schemas with our rules. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org