On Sat, Oct 19, 2024 at 8:01 AM XQ Hu via dev <dev@beam.apache.org> wrote:
>
> I probably missed something. Tried this toy example:
>
> pipeline:
>   transforms:
>     - type: Create
>       config:
>         elements: [1, 2, 3, 4, -1]
>     - type: MapToFields
>       input: Create
>       name: MapToFields_1
>       config:
>         language: python
>         fields:
>           element:
>             callable: |
>               import math
>               def process_num(row):
>                 return math.sqrt(row.element)
>         error_handling:
>           output: my_error_output
>     - type: LogForTesting
>       input: MapToFields_1
>     - type: MapToFields
>       input: MapToFields_1.my_error_output
>       name: MapToFields_2
>       config:
>         language: python
>         fields:
>           element:
>             callable: |
>               # return the raw element
>               def process_error_row(row):
>                 return row.element[0]
>     - type: LogForTesting
>       input: MapToFields_2
>
> It looks like MapToFields is good enough to get any information returned by 
> error_handling.

Yes, it's possible, but your MapToFields_2 needs to know (and
reproduce) the structure of the original element to reconstruct it.

> On Sat, Oct 19, 2024 at 2:55 AM Ahmed Abualsaud via dev <dev@beam.apache.org> 
> wrote:
>>
>> Another option is to add a second DLQ that outputs just the original rows, 
>> i.e. the user has the option to fetch failed rows with or without metadata.
>> It would take some work on our side to add this second DLQ to existing 
>> transforms, but that seems pretty straightforward.

Yeah. I would prefer to do it in such a way that one didn't have to
modify all existing (and future) transforms. Another downside is that
having two error outputs doesn't play as nicely with error handlers
(https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/transforms/errorhandling/ErrorHandler.html
).

Yet another option would be to add a yaml StripErrorMetadata
transform, as this is the place where it's not convenient to just do a
map.

>> On Sat, Oct 19, 2024 at 1:03 AM Robert Bradshaw via dev 
>> <dev@beam.apache.org> wrote:
>>>
>>> I came across an interesting user report at
>>> https://github.com/apache/beam/issues/32866 which made me realize that
>>> providing metadata about a bad element in the "bad records" output is
>>> useful, we don't make it easy to extract the output into a PCollection
>>> of the original elements. The output schema contains the original
>>> element as well as metadata about what error occurred, and in an
>>> ordinary Beam pipeline one could easily apply a Map(lambda error_row:
>>> error_row.element) but YAML doesn't have Map, just MapToFields
>>> (primarily to be more schema friendly).
>>>
>>> There are a couple of options:
>>>
>>> (0) Leave things as they are. One can write
>>>
>>> type: MapToFields
>>> config:
>>>   fields:
>>>     fld1: element.fld1
>>>     fld2: element.fld2
>>>     ...
>>>
>>>
>>> This is of course a bit ugly as one needs to enumerate (and know) the
>>> set of original fields.
>>>
>>> (1a) Provide a special operation "Unnest" that takes a single field
>>> and emits it as the top-level element. This can of course result in
>>> unschema'd PCollections (which are supported, but generally don't play
>>> as well with the other operations, including xlang ones).
>>>
>>> (1b) Just provide a Map. This is a generalization of 1a, but on the
>>> other hand would be more prone to abuse.
>>>
>>> (1c) We could name this
>>>
>>> type: MapToFields
>>> config:
>>>   fields:
>>>     *: element
>>>
>>> IIRC, we already have the special case of "*" in our join syntax, and
>>> we could re-use a bunch of the MapToFields infrastructure. But maybe
>>> it's too obscure?
>>>
>>> (2) Add an optional argument to error_handling to omit the metadata.
>>> This would require a bit of a hack to support ubiquitously, and
>>> wouldn't solve the more general problem.
>>>
>>> Maybe there are some other ideas as well?

Reply via email to