Polber commented on code in PR #30368: URL: https://github.com/apache/beam/pull/30368#discussion_r1504853883
########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -550,6 +551,50 @@ def extract_expr(name, v): return pcoll | sql_transform_constructor(query) [email protected]_fn +def _Split( + pcoll, + outputs: List[str], + destination: Union[str, Dict[str, str]], + unknown_output: Optional[str] = None, + error_handling: Optional[Mapping[str, Any]] = None, + language: Optional[str] = 'generic'): + split_fn = _as_callable_for_pcoll(pcoll, destination, 'destination', language) Review Comment: This should theoretically allow callables, and files, right? Maybe adding that syntax to the doc and a couple tests could be useful. ########## website/www/site/content/en/documentation/sdks/yaml-udf.md: ########## @@ -207,6 +207,73 @@ criteria. This can be accomplished with a `Filter` transform, e.g. keep: "col2 > 0" ``` +## Splitting + +It can also be useful to send different elements to different places +(similar to what is done with side outputs in other SDKs). +While this can be done with a set of `Filter` operations, if every +element has a single destination it can be more natural to use a `Split` +transform instead which send every element to a unique output. +For example, this will send all elements where `col1` is equal to `"a"` to the +output `Split.a`. + +``` +- type: Split + input: input + config: + destination: col1 + outputs: ['a', 'b', 'c'] + +- type: SomeTransform + input: Split.a + config: + param: ... + +- type: AnotherTransform + input: Split.b + config: + param: ... +``` + +One can also specify the destination as a function, e.g. + +``` +- type: Split + input: input + config: + language: python + destination: "'even' if col2 % 2 == 0 else 'odd'" + outputs: ['even', 'odd'] +``` + +One can optionally provide a catch-all output which will capture all elements +that are not in the named outputs (which would otherwise be an error): + +``` +- type: Split + input: input + config: + destination: col1 + outputs: ['a', 'b', 'c'] + unknown_output: 'other' +``` + +To send elements to multiple (or no) outputs, one could use an iterable column +and proceed the `Split` with an `Explode`. Review Comment: nit: proceed -> precede? ########## sdks/python/apache_beam/yaml/yaml_mapping.py: ########## @@ -550,6 +551,50 @@ def extract_expr(name, v): return pcoll | sql_transform_constructor(query) [email protected]_fn +def _Split( + pcoll, + outputs: List[str], + destination: Union[str, Dict[str, str]], + unknown_output: Optional[str] = None, + error_handling: Optional[Mapping[str, Any]] = None, + language: Optional[str] = 'generic'): + split_fn = _as_callable_for_pcoll(pcoll, destination, 'destination', language) + error_output = error_handling['output'] if error_handling else None + if error_output in outputs: + raise ValueError( + f'Error handling output "{error_output}" ' + f'cannot be among the listed outputs {outputs}') + T = TypeVar('T') + + def split(element): + tag = split_fn(element) + if tag not in outputs: + if unknown_output: + tag = unknown_output + else: + raise ValueError(f'Unknown output name for destination "{tag}"') + return beam.pvalue.TaggedOutput(tag, element) + + output_set = set(outputs) + if unknown_output: + output_set.add(unknown_output) + if error_output: + output_set.add(error_output) + mapping_transform = beam.Map(split) + if error_output: + mapping_transform = mapping_transform.with_exception_handling( + **exception_handling_args(error_handling)) + else: + mapping_transform = mapping_transform.with_outputs(*output_set) + splits = pcoll | mapping_transform.with_input_types(T).with_output_types(T) + result = {out: getattr(splits, out) for out in output_set} + if error_output: + result[ + error_output] = result[error_output] | _map_errors_to_standard_format() Review Comment: nit: couldn't you still use the `@maybe_with_exception_handling_transform_fn` decorator? Just also keep the `error_handling` parameter to this function so the check at the top can still be done. ########## website/www/site/content/en/documentation/sdks/yaml-udf.md: ########## @@ -207,6 +207,73 @@ criteria. This can be accomplished with a `Filter` transform, e.g. keep: "col2 > 0" ``` +## Splitting + +It can also be useful to send different elements to different places +(similar to what is done with side outputs in other SDKs). +While this can be done with a set of `Filter` operations, if every +element has a single destination it can be more natural to use a `Split` +transform instead which send every element to a unique output. +For example, this will send all elements where `col1` is equal to `"a"` to the +output `Split.a`. + +``` +- type: Split + input: input + config: + destination: col1 + outputs: ['a', 'b', 'c'] + +- type: SomeTransform + input: Split.a + config: + param: ... + +- type: AnotherTransform + input: Split.b + config: + param: ... +``` + +One can also specify the destination as a function, e.g. + +``` +- type: Split + input: input + config: + language: python + destination: "'even' if col2 % 2 == 0 else 'odd'" + outputs: ['even', 'odd'] +``` + +One can optionally provide a catch-all output which will capture all elements +that are not in the named outputs (which would otherwise be an error): + +``` +- type: Split + input: input + config: + destination: col1 + outputs: ['a', 'b', 'c'] + unknown_output: 'other' +``` + +To send elements to multiple (or no) outputs, one could use an iterable column +and proceed the `Split` with an `Explode`. Review Comment: Also, what is this example providing? The other examples already display how to send elements to multiple outputs - this one just applies an explode to a presumably iterable column, but I don't see how this example provides value as a unique use-case. ########## website/www/site/content/en/documentation/sdks/yaml-udf.md: ########## @@ -207,6 +207,73 @@ criteria. This can be accomplished with a `Filter` transform, e.g. keep: "col2 > 0" ``` +## Splitting + +It can also be useful to send different elements to different places +(similar to what is done with side outputs in other SDKs). +While this can be done with a set of `Filter` operations, if every +element has a single destination it can be more natural to use a `Split` +transform instead which send every element to a unique output. +For example, this will send all elements where `col1` is equal to `"a"` to the +output `Split.a`. + +``` +- type: Split + input: input + config: + destination: col1 + outputs: ['a', 'b', 'c'] + +- type: SomeTransform + input: Split.a + config: + param: ... + +- type: AnotherTransform + input: Split.b + config: + param: ... +``` Review Comment: I'm not sure this syntax is super intuitive. The `destination` parameter in this case is more like the source column, and the outputs double as comparison values - although this is not the case when a function is defined. Something more like this for the shorthand ``` - type: Split input: input config: on: col1 outputs: ['a', 'b', 'c'] ``` Seems more intuitive (although I am not fixed on those exact parameter names per se), but it does arguably add complexity by having different parameter names. --- Also this works for strings, but would it work for say, booleans? or ints? `input: Split.true`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
