[
https://issues.apache.org/jira/browse/BEAM-10640?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17190825#comment-17190825
]
Beam JIRA Bot commented on BEAM-10640:
--------------------------------------
This issue is assigned but has not received an update in 30 days so it has been
labeled "stale-assigned". If you are still working on the issue, please give an
update and remove the label. If you are no longer working on the issue, please
unassign so someone else may work on it. In 7 days the issue will be
automatically unassigned.
> Return ERROR details from apache_beam.io.gcp.bigquery.WriteToBigQuery failed
> inserts
> ------------------------------------------------------------------------------------
>
> Key: BEAM-10640
> URL: https://issues.apache.org/jira/browse/BEAM-10640
> Project: Beam
> Issue Type: Improvement
> Components: beam-community, io-py-gcp
> Affects Versions: 2.23.0
> Environment: LocalRunner, Beam v2.23
> Reporter: Renato Martins Leite
> Assignee: Aizhamal Nurmamat kyzy
> Priority: P1
> Labels: stale-assigned
>
> In:
> class BigQueryWriteFn(DoFn):
> - def _flush_batch(self, destination):
>
> Return an additional pvalue.TaggedOutput with the detailed ERROR from failed
> insertion to BigQuery.
>
> Today the error returns only the row (payload) of the error, like this:
> {code:java}
> // Return Statement
> return [
> pvalue.TaggedOutput(
> BigQueryWriteFn.FAILED_ROWS,
> GlobalWindows.windowed_value((destination, row)))
> for row in failed_rows
> ]
> {code}
>
> For error analysis it is super important to understand WHAT is causing the
> error.
> In this same function, we only need to return the error from BigQuery in an
> additional pvalue.TaggedOutput:
>
> {code:java}
> // Function that captures the error
> passed, errors = self.bigquery_wrapper.insert_rows(
> project_id=table_reference.projectId,
> dataset_id=table_reference.datasetId,
> table_id=table_reference.tableId,
> rows=rows,
> insert_ids=insert_ids,
> skip_invalid_rows=True)
> {code}
> The new return would look like this:
>
> {code:java}
> // new return statement
> return [
> pvalue.TaggedOutput(
> BigQueryWriteFn.FAILED_ROWS,
> GlobalWindows.windowed_value((destination, row, error)))
> for row in failed_rows
> ]{code}
> Thank you!
--
This message was sent by Atlassian Jira
(v8.3.4#803005)