Renato Martins Leite created BEAM-10640:
-------------------------------------------

             Summary: Return ERROR details from 
apache_beam.io.gcp.bigquery.WriteToBigQuery failed inserts
                 Key: BEAM-10640
                 URL: https://issues.apache.org/jira/browse/BEAM-10640
             Project: Beam
          Issue Type: Improvement
          Components: beam-community, io-py-gcp
    Affects Versions: 2.23.0
         Environment: LocalRunner, Beam v2.23
            Reporter: Renato Martins Leite
            Assignee: Aizhamal Nurmamat kyzy


In:
class BigQueryWriteFn(DoFn):
 - def _flush_batch(self, destination):
 
Return an additional pvalue.TaggedOutput with the detailed ERROR from failed 
insertion to BigQuery.
 
Today the error returns only the row (payload) of the error, like this:
{code:java}
// return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row)))
for row in failed_rows
]
{code}
 
For error analysis it is super important to understand WHAT is causing the 
error.
In this same function, we only need to return the error from BigQuery in an 
additional pvalue.TaggedOutput:
 
{code:java}
// passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,          
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
{code}
The new return would look like this:
 
{code:java}
// return [ 
pvalue.TaggedOutput( 
BigQueryWriteFn.FAILED_ROWS, 
GlobalWindows.windowed_value((destination, row, error))) 
for row in failed_rows 
]{code}
 Thank you!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to