[
https://issues.apache.org/jira/browse/BEAM-14383?focusedWorklogId=764677&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-764677
]
ASF GitHub Bot logged work on BEAM-14383:
-----------------------------------------
Author: ASF GitHub Bot
Created on: 30/Apr/22 16:29
Start Date: 30/Apr/22 16:29
Worklog Time Spent: 10m
Work Description: Firlej opened a new pull request, #17517:
URL: https://github.com/apache/beam/pull/17517
`WriteToBigQuery` pipeline returns `errors` when trying to insert rows that
do not match the BigQuery table schema. `errors` is a dictionary that cointains
one `FailedRows` key. `FailedRows` is a list of tuples where each tuple has two
elements: BigQuery table name and the row that didn't match the schema.
This can be verified by running the `BigQueryIO deadletter pattern`
https://beam.apache.org/documentation/patterns/bigqueryio/
Using the template approach I can print the failed rows in a pipeline. When
running the job, logger simultaneously prints out the reason why the rows were
invalid.
The reason (for why the row is invalid) should also be included in the tuple
in addition to the BigQuery table and the raw row. This way next pipeline could
eg. insert the invalid rows into a different BigQuery table with a schema.
```python
error_schema = (
{
'fields': [
{'name': 'timestamp', 'type': 'TIMESTAMP', 'mode': 'REQUIRED'},
{'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED'},
{'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE'},
{'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED'},
]
}
)
```
The whole pipeline implementation could look something like this
```python
with beam.Pipeline(options=pipeline_options) as p:
errors = (
p
| "Read from Pub/Sub subscription" >>
beam.io.gcp.pubsub.ReadFromPubSub(
subscription=known_args.input_subscription,
timestamp_attribute=None)
| "UTF-8 bytes to string" >> beam.Map(lambda msg:
msg.decode("utf-8"))
| "Parse JSON messages" >> beam.Map(json.loads)
| "WriteToBigQuery" >> beam.io.WriteToBigQuery(
known_args.output_table,
schema=schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
ignore_unknown_columns=False,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)
)
result = (
errors["FailedRows"]
| 'ParseErrors' >> beam.Map(lambda err: {
"timestamp": time.time_ns() / 1000000000,
"table": err[0],
"reason": None, # TODO to be replaced with `err[2]`
"row_json": json.dumps(err[1]),
})
| "WriteErrorsToBigQuery" >> beam.io.WriteToBigQuery(
known_args.output_table + "_error_records",
schema=error_schema,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
ignore_unknown_columns=False,
insert_retry_strategy=RetryStrategy.RETRY_ON_TRANSIENT_ERROR
)
)
```
During my reasearch I found a couple of alternate solutions, but they are
more complex than they need to be. Thats why I explored the beam source code
and found the solution to be an easy and simple change.
------------------------
Thank you for your contribution! Follow this checklist to help us
incorporate your contribution quickly and easily:
- [x] [**Choose
reviewer(s)**](https://beam.apache.org/contribute/#make-your-change) and
mention them in a comment (`R: @username`).
- [x] Format the pull request title like `[BEAM-XXX] Fixes bug in
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA
issue, if applicable. This will automatically link the pull request to the
issue.
- [ ] Update `CHANGES.md` with noteworthy changes.
- [ ] If this contribution is large, please file an Apache [Individual
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
See the [Contributor Guide](https://beam.apache.org/contribute) for more
tips on [how to make review process
smoother](https://beam.apache.org/contribute/#make-reviewers-job-easier).
To check the build health, please visit
[https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md](https://github.com/apache/beam/blob/master/.test-infra/BUILD_STATUS.md)
GitHub Actions Tests Status (on master branch)
------------------------------------------------------------------------------------------------
[](https://github.com/apache/beam/actions?query=workflow%3A%22Build+python+source+distribution+and+wheels%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Python+Tests%22+branch%3Amaster+event%3Aschedule)
[](https://github.com/apache/beam/actions?query=workflow%3A%22Java+Tests%22+branch%3Amaster+event%3Aschedule)
See [CI.md](https://github.com/apache/beam/blob/master/CI.md) for more
information about GitHub Actions CI.
Issue Time Tracking
-------------------
Worklog Id: (was: 764677)
Time Spent: 1h 10m (was: 1h)
> Improve "FailedRows" errors returned by beam.io.WriteToBigQuery
> ---------------------------------------------------------------
>
> Key: BEAM-14383
> URL: https://issues.apache.org/jira/browse/BEAM-14383
> Project: Beam
> Issue Type: Improvement
> Components: io-py-gcp
> Reporter: Oskar Firlej
> Priority: P2
> Time Spent: 1h 10m
> Remaining Estimate: 0h
>
> `WriteToBigQuery` pipeline returns `errors` when trying to insert rows that
> do not match the BigQuery table schema. `errors` is a dictionary that
> cointains one `FailedRows` key. `FailedRows` is a list of tuples where each
> tuple has two elements: BigQuery table name and the row that didn't match the
> schema.
> This can be verified by running the `BigQueryIO deadletter pattern`
> https://beam.apache.org/documentation/patterns/bigqueryio/
> Using this approach I can print the failed rows in a pipeline. When running
> the job, logger simultaneously prints out the reason why the rows were
> invalid. The reason should also be included in the tuple in addition to the
> BigQuery table and the raw row. This way next pipeline could process both the
> invalid row and the reason why it is invalid.
> During my reasearch i found a couple of alternate solutions, but i think they
> are more complex than they need to be. Thats why i explored the beam source
> code and found the solution to be an easy and simple change.
--
This message was sent by Atlassian Jira
(v8.20.7#820007)