damccorm opened a new issue, #20362:
URL: https://github.com/apache/beam/issues/20362

   In:
    class BigQueryWriteFn(DoFn):
     - def _flush_batch(self, destination):
     
    Return an additional pvalue.TaggedOutput with the detailed ERROR from 
failed insertion to BigQuery.
     
    Today the error returns only the row (payload) of the error, like this:
   ```
   
   // Return Statement
   return [
   pvalue.TaggedOutput(
   BigQueryWriteFn.FAILED_ROWS,
   GlobalWindows.windowed_value((destination,
   row)))
   for row in failed_rows
   ]
   
   ```
   
    
    For error analysis it is super important to understand WHAT is causing the 
error.
    In this same function, we only need to return the error from BigQuery in an 
additional pvalue.TaggedOutput:
     
   ```
   
   // Function that captures the error
   passed, errors = self.bigquery_wrapper.insert_rows(
   project_id=table_reference.projectId,
            
   dataset_id=table_reference.datasetId,
   table_id=table_reference.tableId,
   rows=rows,
   insert_ids=insert_ids,
   skip_invalid_rows=True)
   
   ```
   
   The new return would look like this:
     
   ```
   
   // new return statement
   return [ 
   pvalue.TaggedOutput( 
   BigQueryWriteFn.FAILED_ROWS, 
   GlobalWindows.windowed_value((destination,
   row, error))) 
   for row in failed_rows 
   ]
   ```
   
    Thank you!
   
   Imported from Jira 
[BEAM-10640](https://issues.apache.org/jira/browse/BEAM-10640). Original Jira 
may contain additional context.
   Reported by: leiterenato.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to