damccorm commented on code in PR #35672: URL: https://github.com/apache/beam/pull/35672#discussion_r2231730140
########## sdks/python/apache_beam/yaml/yaml_provider.py: ########## @@ -932,6 +932,129 @@ def __init__(self): # pylint: disable=useless-parent-delegation super().__init__() + def _unify_field_types(self, existing_type, field_type): + """Unify two field types, handling Optional and List types.""" + # Extract inner types from Optional if needed + existing_inner = ( + existing_type.__args__[0] if hasattr(existing_type, '__args__') and + len(existing_type.__args__) == 1 else existing_type) + field_inner = ( + field_type.__args__[0] if hasattr(field_type, '__args__') and + len(field_type.__args__) == 1 else field_type) + + # Handle type unification more carefully + if existing_inner == Any or field_inner == Any: + return Optional[Any] + elif existing_inner == field_inner: + return Optional[existing_inner] Review Comment: Will this logic unify `Iterable[str]`, `str` to `Optional[str]` since `Dict` also has __args__ of length 1? I think we want to actually check if the outer type is `Optional` ########## sdks/python/apache_beam/yaml/yaml_provider.py: ########## @@ -932,6 +932,129 @@ def __init__(self): # pylint: disable=useless-parent-delegation super().__init__() + def _unify_field_types(self, existing_type, field_type): + """Unify two field types, handling Optional and List types.""" + # Extract inner types from Optional if needed + existing_inner = ( + existing_type.__args__[0] if hasattr(existing_type, '__args__') and + len(existing_type.__args__) == 1 else existing_type) + field_inner = ( + field_type.__args__[0] if hasattr(field_type, '__args__') and + len(field_type.__args__) == 1 else field_type) + + # Handle type unification more carefully + if existing_inner == Any or field_inner == Any: + return Optional[Any] + elif existing_inner == field_inner: + return Optional[existing_inner] + else: + # Check for list types and prioritize them over other types + from apache_beam.typehints import typehints as th + existing_is_list = ( + hasattr(existing_inner, '__origin__') and + existing_inner.__origin__ in (list, th.List)) + field_is_list = ( + hasattr(field_inner, '__origin__') and + field_inner.__origin__ in (list, th.List)) + + if existing_is_list and field_is_list: + # Both are list types, unify their element types + existing_elem = existing_inner.__args__[ + 0] if existing_inner.__args__ else Any + field_elem = field_inner.__args__[0] if field_inner.__args__ else Any + if existing_elem == field_elem: + return Optional[th.List[existing_elem]] + else: + return Optional[th.List[Any]] + elif existing_is_list: + # Existing is list, keep it as list type + return Optional[existing_inner] + elif field_is_list: + # New field is list, use list type + return Optional[field_inner] + else: + # Neither is a list, use Any to avoid unsupported Union + # types in schema translation + return Optional[Any] + + def _merge_schemas(self, pcolls): + """Merge schemas from multiple PCollections to create a unified schema. + + This function creates a unified schema that contains all fields from all + input PCollections. Fields are made optional to handle missing values. + """ + from apache_beam.typehints.schemas import named_fields_from_element_type + + # Collect all schemas + schemas = [] + for pcoll in pcolls: + if hasattr(pcoll, 'element_type') and pcoll.element_type: + try: + fields = named_fields_from_element_type(pcoll.element_type) + schemas.append(dict(fields)) + except (ValueError, TypeError): + # If we can't extract schema, skip this PCollection + continue + + if not schemas: + return None + + # Merge all field names and types, making them optional + all_fields = {} + for schema in schemas: + for field_name, field_type in schema.items(): + if field_name in all_fields: + # If field exists with different type, use Union + existing_type = all_fields[field_name] + if existing_type != field_type: + all_fields[field_name] = self._unify_field_types( + existing_type, field_type) + else: + # Make field optional since not all PCollections may have it + all_fields[field_name] = Optional[field_type] Review Comment: Could we keep track of when one of these schema difference conditions is hit and warn? ########## sdks/python/apache_beam/yaml/yaml_provider.py: ########## @@ -932,6 +932,129 @@ def __init__(self): # pylint: disable=useless-parent-delegation super().__init__() + def _unify_field_types(self, existing_type, field_type): + """Unify two field types, handling Optional and List types.""" + # Extract inner types from Optional if needed + existing_inner = ( + existing_type.__args__[0] if hasattr(existing_type, '__args__') and + len(existing_type.__args__) == 1 else existing_type) + field_inner = ( + field_type.__args__[0] if hasattr(field_type, '__args__') and + len(field_type.__args__) == 1 else field_type) + + # Handle type unification more carefully + if existing_inner == Any or field_inner == Any: + return Optional[Any] + elif existing_inner == field_inner: + return Optional[existing_inner] + else: + # Check for list types and prioritize them over other types + from apache_beam.typehints import typehints as th + existing_is_list = ( + hasattr(existing_inner, '__origin__') and + existing_inner.__origin__ in (list, th.List)) + field_is_list = ( + hasattr(field_inner, '__origin__') and + field_inner.__origin__ in (list, th.List)) + + if existing_is_list and field_is_list: + # Both are list types, unify their element types + existing_elem = existing_inner.__args__[ + 0] if existing_inner.__args__ else Any + field_elem = field_inner.__args__[0] if field_inner.__args__ else Any + if existing_elem == field_elem: + return Optional[th.List[existing_elem]] + else: + return Optional[th.List[Any]] + elif existing_is_list: + # Existing is list, keep it as list type + return Optional[existing_inner] + elif field_is_list: + # New field is list, use list type + return Optional[field_inner] + else: + # Neither is a list, use Any to avoid unsupported Union + # types in schema translation + return Optional[Any] + + def _merge_schemas(self, pcolls): + """Merge schemas from multiple PCollections to create a unified schema. + + This function creates a unified schema that contains all fields from all + input PCollections. Fields are made optional to handle missing values. + """ + from apache_beam.typehints.schemas import named_fields_from_element_type + + # Collect all schemas + schemas = [] + for pcoll in pcolls: + if hasattr(pcoll, 'element_type') and pcoll.element_type: + try: + fields = named_fields_from_element_type(pcoll.element_type) + schemas.append(dict(fields)) + except (ValueError, TypeError): + # If we can't extract schema, skip this PCollection + continue + + if not schemas: + return None + + # Merge all field names and types, making them optional + all_fields = {} + for schema in schemas: Review Comment: Do we actually need to handle nested structures? Could we just say given: ``` pcoll1: {'foo': TypeA} pcoll2: {'foo': TypeB} outPcoll: {'foo': Union[TypeA, TypeB]} ``` and ignore the nested representations? ########## sdks/python/apache_beam/yaml/yaml_provider.py: ########## @@ -932,6 +932,129 @@ def __init__(self): # pylint: disable=useless-parent-delegation super().__init__() + def _unify_field_types(self, existing_type, field_type): + """Unify two field types, handling Optional and List types.""" + # Extract inner types from Optional if needed + existing_inner = ( + existing_type.__args__[0] if hasattr(existing_type, '__args__') and + len(existing_type.__args__) == 1 else existing_type) + field_inner = ( + field_type.__args__[0] if hasattr(field_type, '__args__') and + len(field_type.__args__) == 1 else field_type) + + # Handle type unification more carefully + if existing_inner == Any or field_inner == Any: + return Optional[Any] + elif existing_inner == field_inner: + return Optional[existing_inner] + else: + # Check for list types and prioritize them over other types + from apache_beam.typehints import typehints as th + existing_is_list = ( + hasattr(existing_inner, '__origin__') and + existing_inner.__origin__ in (list, th.List)) + field_is_list = ( + hasattr(field_inner, '__origin__') and + field_inner.__origin__ in (list, th.List)) + + if existing_is_list and field_is_list: + # Both are list types, unify their element types + existing_elem = existing_inner.__args__[ + 0] if existing_inner.__args__ else Any + field_elem = field_inner.__args__[0] if field_inner.__args__ else Any + if existing_elem == field_elem: + return Optional[th.List[existing_elem]] + else: + return Optional[th.List[Any]] + elif existing_is_list: + # Existing is list, keep it as list type + return Optional[existing_inner] + elif field_is_list: + # New field is list, use list type + return Optional[field_inner] Review Comment: Ah, I see we're trying to avoid Union types. Probably this just needs to be encoded as `Any` then, right? ########## sdks/python/apache_beam/yaml/yaml_provider.py: ########## @@ -932,6 +932,129 @@ def __init__(self): # pylint: disable=useless-parent-delegation super().__init__() + def _unify_field_types(self, existing_type, field_type): + """Unify two field types, handling Optional and List types.""" + # Extract inner types from Optional if needed + existing_inner = ( + existing_type.__args__[0] if hasattr(existing_type, '__args__') and + len(existing_type.__args__) == 1 else existing_type) + field_inner = ( + field_type.__args__[0] if hasattr(field_type, '__args__') and + len(field_type.__args__) == 1 else field_type) + + # Handle type unification more carefully + if existing_inner == Any or field_inner == Any: + return Optional[Any] + elif existing_inner == field_inner: + return Optional[existing_inner] + else: + # Check for list types and prioritize them over other types + from apache_beam.typehints import typehints as th + existing_is_list = ( + hasattr(existing_inner, '__origin__') and + existing_inner.__origin__ in (list, th.List)) + field_is_list = ( + hasattr(field_inner, '__origin__') and + field_inner.__origin__ in (list, th.List)) + + if existing_is_list and field_is_list: + # Both are list types, unify their element types + existing_elem = existing_inner.__args__[ + 0] if existing_inner.__args__ else Any + field_elem = field_inner.__args__[0] if field_inner.__args__ else Any + if existing_elem == field_elem: + return Optional[th.List[existing_elem]] + else: + return Optional[th.List[Any]] + elif existing_is_list: + # Existing is list, keep it as list type + return Optional[existing_inner] + elif field_is_list: + # New field is list, use list type + return Optional[field_inner] Review Comment: Why is this not just `Optional[Union[existing_inner, field_inner]]`? Isn't either list or single element valid? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: github-unsubscr...@beam.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org