[ 
https://issues.apache.org/jira/browse/BEAM-12737?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17400627#comment-17400627
 ] 

Brachi Packter commented on BEAM-12737:
---------------------------------------

Yes the best option is to get the original record.
If it has performance impact, it will be ok if you will send us the error 
alongside with the records ids... (we will need some API to set id to record.. 
or send KV  of key: id and value: Row, and you will return us the keys for 
failed records)
I want to achieve 2 things:
1. prevent the endless retries and the affected backlog increase. (for 
unbounded pipelines)
2. get the failed records and send them to some dead letter queue / side table.

> [SQL] add API to retrieve failed events in the collection due to query 
> runtime error 
> -------------------------------------------------------------------------------------
>
>                 Key: BEAM-12737
>                 URL: https://issues.apache.org/jira/browse/BEAM-12737
>             Project: Beam
>          Issue Type: Improvement
>          Components: dsl-sql
>            Reporter: Brachi Packter
>            Priority: P3
>
> When calling to 
> ```
> collection.apply(SqlTransform.query(query))
> ```
> we had cases in our production that query causes some errors for small amount 
> of events in the collection.
> For example:
> 1. functions or UDF that produce null pointer exception when it is called it 
> with null value
> 2. casting issues in sql failed for some invalid data.
> and more.
> For sure we can fix those issues, but the point it that it can be missed when 
> developing the query and failed only in production runtime in some rare cases.
> Current behaviour is that query is failed and records are retried forever.
> We would want an option to get failed records from the query and then we can 
> send them to DLQ or totally skip them
> Is it possible to have something similar to what we have in BigQueryIo? 
> `getFailedInsertsWithErr` ?
> {code:java}
> .apply(BigQueryIO.write()
>                         
> .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
>                         .withExtendedErrorInfo())
>                 .getFailedInsertsWithErr().apply(....);
> {code}
> This is also mentioned in Beam style guide: 
> https://beam.apache.org/contribute/ptransform-style-guide/#error-handling



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to