Neil Buckley created BEAM-10585:
-----------------------------------

             Summary: Add error context to the BigQuery FAILED_ROWS tagged 
output
                 Key: BEAM-10585
                 URL: https://issues.apache.org/jira/browse/BEAM-10585
             Project: Beam
          Issue Type: New Feature
          Components: sdk-py-core
            Reporter: Neil Buckley


When using WriteToBigQuery in the Python SDK, a tagged output of FAILED_ROWS is 
returned with a list of tuples, each tuple having two elements: a string with 
the table destination and a dict with the row key value pairs.

This tuple does not, however, contain any error context for why the error 
failed. I propose adding a third value to the tuple which contains an instance 
of InsertErrorsValueListEntry to easily provide context for the error in 
question.

Below is a patch to implement this change (from 
[https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186)]
 - I've made a patch locally and can create a PR if it helps (just need access)

 
{code:java}
while True:
  passed, errors = self.bigquery_wrapper.insert_rows(
      project_id=table_reference.projectId,
      dataset_id=table_reference.datasetId,
      table_id=table_reference.tableId,
      rows=rows,
      insert_ids=insert_ids,
      skip_invalid_rows=True)

  failed_rows = [(rows[entry.index], entry) for entry in errors]
  should_retry = any(
      bigquery_tools.RetryStrategy.should_retry(
          self._retry_strategy, entry.errors[0].reason) for entry in errors)
  if not passed:
    message = (
        'There were errors inserting to BigQuery. Will{} retry. '
        'Errors were {}'.format(("" if should_retry else " not"), errors))
    if should_retry:
      _LOGGER.warning(message)
    else:
      _LOGGER.error(message)

  rows = [rows[entry.index] for entry in errors]

  if not should_retry:
    break
  else:
    retry_backoff = next(self._backoff_calculator)
    _LOGGER.info(
        'Sleeping %s seconds before retrying insertion.', retry_backoff)
    time.sleep(retry_backoff)

self._total_buffered_rows -= len(self._rows_buffer[destination])
del self._rows_buffer[destination]

return [
    pvalue.TaggedOutput(
        BigQueryWriteFn.FAILED_ROWS,
        GlobalWindows.windowed_value((destination, row[0], row[1])))
    for row in failed_rows
]
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to