damccorm opened a new issue, #20522:
URL: https://github.com/apache/beam/issues/20522

   When using WriteToBigQuery in the Python SDK, a tagged output of FAILED_ROWS 
is returned with a list of tuples, each tuple having two elements: a string 
with the table destination and a dict with the row key value pairs.
   
   This tuple does not, however, contain any error context for why the error 
failed. I propose adding a third value to the tuple which contains an instance 
of InsertErrorsValueListEntry to easily provide context for the error in 
question.
   
   Below is a patch to implement this change (from 
[https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186)](https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186))
 - I've made a patch locally and can create a PR if it helps (just need access)
   
    
   ```
   
   while True:
     passed, errors = self.bigquery_wrapper.insert_rows(
         project_id=table_reference.projectId,
   
        dataset_id=table_reference.datasetId,
         table_id=table_reference.tableId,
         rows=rows,
   
        insert_ids=insert_ids,
         skip_invalid_rows=True)
   
     failed_rows = [(rows[entry.index], entry)
   for entry in errors]
     should_retry = any(
         bigquery_tools.RetryStrategy.should_retry(
       
        self._retry_strategy, entry.errors[0].reason) for entry in errors)
     if not passed:
       message
   = (
           'There were errors inserting to BigQuery. Will{} retry. '
           'Errors were {}'.format((""
   if should_retry else " not"), errors))
       if should_retry:
         _LOGGER.warning(message)
       else:
   
        _LOGGER.error(message)
   
     rows = [rows[entry.index] for entry in errors]
   
     if not should_retry:
   
      break
     else:
       retry_backoff = next(self._backoff_calculator)
       _LOGGER.info(
           'Sleeping
   %s seconds before retrying insertion.', retry_backoff)
       time.sleep(retry_backoff)
   
   self._total_buffered_rows
   -= len(self._rows_buffer[destination])
   del self._rows_buffer[destination]
   
   return [
       pvalue.TaggedOutput(
   
          BigQueryWriteFn.FAILED_ROWS,
           GlobalWindows.windowed_value((destination, row[0], row[1])))
   
      for row in failed_rows
   ]
   
   ```
   
   
   Imported from Jira 
[BEAM-10585](https://issues.apache.org/jira/browse/BEAM-10585). Original Jira 
may contain additional context.
   Reported by: nfbuckley.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to