Polber commented on code in PR #30368: URL: https://github.com/apache/beam/pull/30368#discussion_r1506168919
########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -550,6 +551,50 @@ def extract_expr(name, v): return pcoll | sql_transform_constructor(query) [email protected]_fn +def _Split( + pcoll, + outputs: List[str], + destination: Union[str, Dict[str, str]], + unknown_output: Optional[str] = None, + error_handling: Optional[Mapping[str, Any]] = None, + language: Optional[str] = 'generic'): + split_fn = _as_callable_for_pcoll(pcoll, destination, 'destination', language) + error_output = error_handling['output'] if error_handling else None + if error_output in outputs: + raise ValueError( + f'Error handling output "{error_output}" ' + f'cannot be among the listed outputs {outputs}') + T = TypeVar('T') + + def split(element): + tag = split_fn(element) + if tag not in outputs: + if unknown_output: + tag = unknown_output + else: + raise ValueError(f'Unknown output name for destination "{tag}"') + return beam.pvalue.TaggedOutput(tag, element) + + output_set = set(outputs) + if unknown_output: + output_set.add(unknown_output) + if error_output: + output_set.add(error_output) + mapping_transform = beam.Map(split) + if error_output: + mapping_transform = mapping_transform.with_exception_handling( + **exception_handling_args(error_handling)) + else: + mapping_transform = mapping_transform.with_outputs(*output_set) + splits = pcoll | mapping_transform.with_input_types(T).with_output_types(T) + result = {out: getattr(splits, out) for out in output_set} + if error_output: + result[ + error_output] = result[error_output] | _map_errors_to_standard_format() Review Comment: Ah ok fair enough. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
