Neil Buckley created BEAM-10585:
-----------------------------------
Summary: Add error context to the BigQuery FAILED_ROWS tagged
output
Key: BEAM-10585
URL: https://issues.apache.org/jira/browse/BEAM-10585
Project: Beam
Issue Type: New Feature
Components: sdk-py-core
Reporter: Neil Buckley
When using WriteToBigQuery in the Python SDK, a tagged output of FAILED_ROWS is
returned with a list of tuples, each tuple having two elements: a string with
the table destination and a dict with the row key value pairs.
This tuple does not, however, contain any error context for why the error
failed. I propose adding a third value to the tuple which contains an instance
of InsertErrorsValueListEntry to easily provide context for the error in
question.
Below is a patch to implement this change (from
[https://github.com/apache/beam/blob/e39294dfcab25e2fab250a4691c8ee3ac390976d/sdks/python/apache_beam/io/gcp/bigquery.py#L1186)]
- I've made a patch locally and can create a PR if it helps (just need access)
{code:java}
while True:
passed, errors = self.bigquery_wrapper.insert_rows(
project_id=table_reference.projectId,
dataset_id=table_reference.datasetId,
table_id=table_reference.tableId,
rows=rows,
insert_ids=insert_ids,
skip_invalid_rows=True)
failed_rows = [(rows[entry.index], entry) for entry in errors]
should_retry = any(
bigquery_tools.RetryStrategy.should_retry(
self._retry_strategy, entry.errors[0].reason) for entry in errors)
if not passed:
message = (
'There were errors inserting to BigQuery. Will{} retry. '
'Errors were {}'.format(("" if should_retry else " not"), errors))
if should_retry:
_LOGGER.warning(message)
else:
_LOGGER.error(message)
rows = [rows[entry.index] for entry in errors]
if not should_retry:
break
else:
retry_backoff = next(self._backoff_calculator)
_LOGGER.info(
'Sleeping %s seconds before retrying insertion.', retry_backoff)
time.sleep(retry_backoff)
self._total_buffered_rows -= len(self._rows_buffer[destination])
del self._rows_buffer[destination]
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
GlobalWindows.windowed_value((destination, row[0], row[1])))
for row in failed_rows
]
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)