damccorm opened a new issue, #20362:
URL: https://github.com/apache/beam/issues/20362
In:
class BigQueryWriteFn(DoFn):
- def _flush_batch(self, destination):
Return an additional pvalue.TaggedOutput with the detailed ERROR from
failed insertion to BigQuery.
Today the error returns only the row (payload) of the error, like this:
```
// Return Statement
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination,
row)))
for row in failed_rows
]
```
For error analysis it is super important to understand WHAT is causing the
error.
In this same function, we only need to return the error from BigQuery in an
additional pvalue.TaggedOutput:
```
// Function that captures the error
passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
```
The new return would look like this:
```
// new return statement
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination,
row, error)))
for row in failed_rows
]
```
Thank you!
Imported from Jira
[BEAM-10640](https://issues.apache.org/jira/browse/BEAM-10640). Original Jira
may contain additional context.
Reported by: leiterenato.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]