This is an automated email from the ASF dual-hosted git repository.
pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new abf488ee880 Revert "Merge pull request #17517 from [BEAM-14383]
Improve "FailedRows" errors returned by beam.io.WriteToBigQuery"
new 2f1117b00da Merge pull request #17601 from [BEAM-14447] Revert "Merge
pull request #17517 from [BEAM-14383] Improve "FailedRo…
abf488ee880 is described below
commit abf488ee880cfc40505d041add1c1afc141bbc36
Author: Pablo E <[email protected]>
AuthorDate: Tue May 10 09:49:28 2022 -0700
Revert "Merge pull request #17517 from [BEAM-14383] Improve "FailedRows"
errors returned by beam.io.WriteToBigQuery"
This reverts commit 358782006e1db86437b3bf61f910db12d654b1e0.
---
CHANGES.md | 1 -
sdks/python/apache_beam/io/gcp/bigquery.py | 8 +-
.../apache_beam/io/gcp/bigquery_write_it_test.py | 91 ----------------------
3 files changed, 3 insertions(+), 97 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index ad3095d9120..5064e86fc57 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -111,7 +111,6 @@
.withValueMapper(new TextMessageMapper());
```
* Coders in Python are expected to inherit from Coder.
([BEAM-14351](https://issues.apache.org/jira/browse/BEAM-14351)).
-* `FailedRows` key of the errors dictionary returned by
`beam.io.WriteToBigQuery` transform now returns an array of 3-element tuples
`(destination_table, row, reason)` instead of `(destination_table, row)`.
([BEAM-14383](https://issues.apache.org/jira/browse/BEAM-14383)).
## Deprecations
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 79f6081e796..2c21dca6047 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1754,8 +1754,7 @@ class BigQueryWriteFn(DoFn):
ignore_unknown_values=self.ignore_unknown_columns)
self.batch_latency_metric.update((time.time() - start) * 1000)
- failed_rows = [(rows[entry['index']], entry["errors"])
- for entry in errors]
+ failed_rows = [rows[entry['index']] for entry in errors]
should_retry = any(
RetryStrategy.should_retry(
self._retry_strategy, entry['errors'][0]['reason'])
@@ -1787,9 +1786,8 @@ class BigQueryWriteFn(DoFn):
return [
pvalue.TaggedOutput(
BigQueryWriteFn.FAILED_ROWS,
- GlobalWindows.windowed_value((destination, row, row_errors)))
- for row,
- row_errors in failed_rows
+ GlobalWindows.windowed_value((destination, row)))
+ for row in failed_rows
]
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index aa188e79ae9..dd2283eb71d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -22,7 +22,6 @@
import base64
import datetime
-import json
import logging
import random
import time
@@ -374,96 +373,6 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
temp_file_format=FileFormat.JSON))
- @pytest.mark.it_postcommit
- def test_big_query_write_insert_errors_reporting(self):
- """
- Test that errors returned by beam.io.WriteToBigQuery
- contain both the failed rows amd the reason for it failing.
- """
- table_name = 'python_write_table'
- table_id = '{}.{}'.format(self.dataset_id, table_name)
-
- errors_table_name = table_name + '_error_records'
- errors_table_id = '{}.{}'.format(self.dataset_id, errors_table_name)
-
- input_data = [{
- 'number': 1,
- 'str': 'some_string',
- }, {
- 'number': 2
- },
- {
- 'number': 3,
- 'str': 'some_string',
- 'additional_field_str': 'some_string',
- }]
-
- table_schema = {
- "fields": [{
- "name": "number", "type": "INTEGER", 'mode': 'REQUIRED'
- }, {
- "name": "str", "type": "STRING", 'mode': 'REQUIRED'
- }]
- }
-
- errors_table_schema = {
- "fields": [{
- 'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED'
- }, {
- 'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE'
- }, {
- 'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED'
- }]
- }
-
- pipeline_verifiers = [
- BigqueryFullResultMatcher(
- project=self.project,
- query="SELECT number, str FROM %s" % table_id,
- data=[(1, 'some_string')]),
- BigqueryFullResultMatcher(
- project=self.project,
- query="SELECT table, reason, row_json FROM %s" % errors_table_id,
- data=
- [(
- table_id,
- '[{"reason": "invalid", "location": "", "debugInfo": "", \
-"message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]',
- '{"number": 2}'),
- (
- table_id,
- '[{"reason": "invalid", "location": "additional_field_str", \
-"debugInfo": "", "message": "no such field: additional_field_str."}]',
- '{"number": 3, "str": "some_string", "additional_field_str": \
-"some_string"}')])
- ]
-
- args = self.test_pipeline.get_full_options_as_args(
- on_success_matcher=hc.all_of(*pipeline_verifiers))
-
- with beam.Pipeline(argv=args) as p:
- # pylint: disable=expression-not-assigned
- errors = (
- p | 'create' >> beam.Create(input_data)
- | 'write' >> beam.io.WriteToBigQuery(
- table_id,
- schema=table_schema,
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
- (
- errors["FailedRows"]
- | 'ParseErrors' >> beam.Map(
- lambda err: {
- "table": err[0],
- "reason": json.dumps(err[2]),
- "row_json": json.dumps(err[1])
- })
- | 'WriteErrors' >> beam.io.WriteToBigQuery(
- errors_table_id,
- schema=errors_table_schema,
- create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
- write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
-
@pytest.mark.it_postcommit
@parameterized.expand([
param(file_format=FileFormat.AVRO),