Polber commented on code in PR #30368:
URL: https://github.com/apache/beam/pull/30368#discussion_r1504853883


##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -550,6 +551,50 @@ def extract_expr(name, v):
   return pcoll | sql_transform_constructor(query)
 
 
[email protected]_fn
+def _Split(
+    pcoll,
+    outputs: List[str],
+    destination: Union[str, Dict[str, str]],
+    unknown_output: Optional[str] = None,
+    error_handling: Optional[Mapping[str, Any]] = None,
+    language: Optional[str] = 'generic'):
+  split_fn = _as_callable_for_pcoll(pcoll, destination, 'destination', 
language)

Review Comment:
   This should theoretically allow callables, and files, right? Maybe adding 
that syntax to the doc and a couple tests could be useful.



##########
website/www/site/content/en/documentation/sdks/yaml-udf.md:
##########
@@ -207,6 +207,73 @@ criteria. This can be accomplished with a `Filter` 
transform, e.g.
     keep: "col2 > 0"
 ```
 
+## Splitting
+
+It can also be useful to send different elements to different places
+(similar to what is done with side outputs in other SDKs).
+While this can be done with a set of `Filter` operations, if every
+element has a single destination it can be more natural to use a `Split`
+transform instead which send every element to a unique output.
+For example, this will send all elements where `col1` is equal to `"a"` to the
+output `Split.a`.
+
+```
+- type: Split
+  input: input
+  config:
+    destination: col1
+    outputs: ['a', 'b', 'c']
+
+- type: SomeTransform
+  input: Split.a
+  config:
+    param: ...
+
+- type: AnotherTransform
+  input: Split.b
+  config:
+    param: ...
+```
+
+One can also specify the destination as a function, e.g.
+
+```
+- type: Split
+  input: input
+  config:
+    language: python
+    destination: "'even' if col2 % 2 == 0 else 'odd'"
+    outputs: ['even', 'odd']
+```
+
+One can optionally provide a catch-all output which will capture all elements
+that are not in the named outputs (which would otherwise be an error):
+
+```
+- type: Split
+  input: input
+  config:
+    destination: col1
+    outputs: ['a', 'b', 'c']
+    unknown_output: 'other'
+```
+
+To send elements to multiple (or no) outputs, one could use an iterable column
+and proceed the `Split` with an `Explode`.

Review Comment:
   nit: proceed -> precede?



##########
sdks/python/apache_beam/yaml/yaml_mapping.py:
##########
@@ -550,6 +551,50 @@ def extract_expr(name, v):
   return pcoll | sql_transform_constructor(query)
 
 
[email protected]_fn
+def _Split(
+    pcoll,
+    outputs: List[str],
+    destination: Union[str, Dict[str, str]],
+    unknown_output: Optional[str] = None,
+    error_handling: Optional[Mapping[str, Any]] = None,
+    language: Optional[str] = 'generic'):
+  split_fn = _as_callable_for_pcoll(pcoll, destination, 'destination', 
language)
+  error_output = error_handling['output'] if error_handling else None
+  if error_output in outputs:
+    raise ValueError(
+        f'Error handling output "{error_output}" '
+        f'cannot be among the listed outputs {outputs}')
+  T = TypeVar('T')
+
+  def split(element):
+    tag = split_fn(element)
+    if tag not in outputs:
+      if unknown_output:
+        tag = unknown_output
+      else:
+        raise ValueError(f'Unknown output name for destination "{tag}"')
+    return beam.pvalue.TaggedOutput(tag, element)
+
+  output_set = set(outputs)
+  if unknown_output:
+    output_set.add(unknown_output)
+  if error_output:
+    output_set.add(error_output)
+  mapping_transform = beam.Map(split)
+  if error_output:
+    mapping_transform = mapping_transform.with_exception_handling(
+        **exception_handling_args(error_handling))
+  else:
+    mapping_transform = mapping_transform.with_outputs(*output_set)
+  splits = pcoll | mapping_transform.with_input_types(T).with_output_types(T)
+  result = {out: getattr(splits, out) for out in output_set}
+  if error_output:
+    result[
+        error_output] = result[error_output] | _map_errors_to_standard_format()

Review Comment:
   nit: couldn't you still use the 
`@maybe_with_exception_handling_transform_fn` decorator? Just also keep the 
`error_handling` parameter to this function so the check at the top can still 
be done.



##########
website/www/site/content/en/documentation/sdks/yaml-udf.md:
##########
@@ -207,6 +207,73 @@ criteria. This can be accomplished with a `Filter` 
transform, e.g.
     keep: "col2 > 0"
 ```
 
+## Splitting
+
+It can also be useful to send different elements to different places
+(similar to what is done with side outputs in other SDKs).
+While this can be done with a set of `Filter` operations, if every
+element has a single destination it can be more natural to use a `Split`
+transform instead which send every element to a unique output.
+For example, this will send all elements where `col1` is equal to `"a"` to the
+output `Split.a`.
+
+```
+- type: Split
+  input: input
+  config:
+    destination: col1
+    outputs: ['a', 'b', 'c']
+
+- type: SomeTransform
+  input: Split.a
+  config:
+    param: ...
+
+- type: AnotherTransform
+  input: Split.b
+  config:
+    param: ...
+```
+
+One can also specify the destination as a function, e.g.
+
+```
+- type: Split
+  input: input
+  config:
+    language: python
+    destination: "'even' if col2 % 2 == 0 else 'odd'"
+    outputs: ['even', 'odd']
+```
+
+One can optionally provide a catch-all output which will capture all elements
+that are not in the named outputs (which would otherwise be an error):
+
+```
+- type: Split
+  input: input
+  config:
+    destination: col1
+    outputs: ['a', 'b', 'c']
+    unknown_output: 'other'
+```
+
+To send elements to multiple (or no) outputs, one could use an iterable column
+and proceed the `Split` with an `Explode`.

Review Comment:
   Also, what is this example providing? The other examples already display how 
to send elements to multiple outputs - this one just applies an explode to a 
presumably iterable column, but I don't see how this example provides value as 
a unique use-case.



##########
website/www/site/content/en/documentation/sdks/yaml-udf.md:
##########
@@ -207,6 +207,73 @@ criteria. This can be accomplished with a `Filter` 
transform, e.g.
     keep: "col2 > 0"
 ```
 
+## Splitting
+
+It can also be useful to send different elements to different places
+(similar to what is done with side outputs in other SDKs).
+While this can be done with a set of `Filter` operations, if every
+element has a single destination it can be more natural to use a `Split`
+transform instead which send every element to a unique output.
+For example, this will send all elements where `col1` is equal to `"a"` to the
+output `Split.a`.
+
+```
+- type: Split
+  input: input
+  config:
+    destination: col1
+    outputs: ['a', 'b', 'c']
+
+- type: SomeTransform
+  input: Split.a
+  config:
+    param: ...
+
+- type: AnotherTransform
+  input: Split.b
+  config:
+    param: ...
+```

Review Comment:
   I'm not sure this syntax is super intuitive. The `destination` parameter in 
this case is more like the source column, and the outputs double as comparison 
values - although this is not the case when a function is defined.
   
   Something more like this for the shorthand
   ```
   - type: Split
     input: input
     config:
       on: col1
       outputs: ['a', 'b', 'c']
   ```
   
   Seems more intuitive (although I am not fixed on those exact parameter names 
per se), but it does arguably add complexity by having different parameter 
names.
   
   ---
   
   Also this works for strings, but would it work for say, booleans? or ints?
   `input: Split.true`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to