This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new abf488ee880 Revert "Merge pull request #17517 from [BEAM-14383] 
Improve "FailedRows" errors returned by beam.io.WriteToBigQuery"
     new 2f1117b00da Merge pull request #17601 from [BEAM-14447] Revert "Merge 
pull request #17517 from [BEAM-14383] Improve "FailedRo…
abf488ee880 is described below

commit abf488ee880cfc40505d041add1c1afc141bbc36
Author: Pablo E <[email protected]>
AuthorDate: Tue May 10 09:49:28 2022 -0700

    Revert "Merge pull request #17517 from [BEAM-14383] Improve "FailedRows" 
errors returned by beam.io.WriteToBigQuery"
    
    This reverts commit 358782006e1db86437b3bf61f910db12d654b1e0.
---
 CHANGES.md                                         |  1 -
 sdks/python/apache_beam/io/gcp/bigquery.py         |  8 +-
 .../apache_beam/io/gcp/bigquery_write_it_test.py   | 91 ----------------------
 3 files changed, 3 insertions(+), 97 deletions(-)

diff --git a/CHANGES.md b/CHANGES.md
index ad3095d9120..5064e86fc57 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -111,7 +111,6 @@
         .withValueMapper(new TextMessageMapper());
 ```
 * Coders in Python are expected to inherit from Coder. 
([BEAM-14351](https://issues.apache.org/jira/browse/BEAM-14351)).
-* `FailedRows` key of the errors dictionary returned by 
`beam.io.WriteToBigQuery` transform now returns an array of 3-element tuples 
`(destination_table, row, reason)` instead of `(destination_table, row)`. 
([BEAM-14383](https://issues.apache.org/jira/browse/BEAM-14383)).
 
 ## Deprecations
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 79f6081e796..2c21dca6047 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1754,8 +1754,7 @@ class BigQueryWriteFn(DoFn):
           ignore_unknown_values=self.ignore_unknown_columns)
       self.batch_latency_metric.update((time.time() - start) * 1000)
 
-      failed_rows = [(rows[entry['index']], entry["errors"])
-                     for entry in errors]
+      failed_rows = [rows[entry['index']] for entry in errors]
       should_retry = any(
           RetryStrategy.should_retry(
               self._retry_strategy, entry['errors'][0]['reason'])
@@ -1787,9 +1786,8 @@ class BigQueryWriteFn(DoFn):
     return [
         pvalue.TaggedOutput(
             BigQueryWriteFn.FAILED_ROWS,
-            GlobalWindows.windowed_value((destination, row, row_errors)))
-        for row,
-        row_errors in failed_rows
+            GlobalWindows.windowed_value((destination, row)))
+        for row in failed_rows
     ]
 
 
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index aa188e79ae9..dd2283eb71d 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -22,7 +22,6 @@
 
 import base64
 import datetime
-import json
 import logging
 import random
 import time
@@ -374,96 +373,6 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
               write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
               temp_file_format=FileFormat.JSON))
 
-  @pytest.mark.it_postcommit
-  def test_big_query_write_insert_errors_reporting(self):
-    """
-    Test that errors returned by beam.io.WriteToBigQuery
-    contain both the failed rows amd the reason for it failing.
-    """
-    table_name = 'python_write_table'
-    table_id = '{}.{}'.format(self.dataset_id, table_name)
-
-    errors_table_name = table_name + '_error_records'
-    errors_table_id = '{}.{}'.format(self.dataset_id, errors_table_name)
-
-    input_data = [{
-        'number': 1,
-        'str': 'some_string',
-    }, {
-        'number': 2
-    },
-                  {
-                      'number': 3,
-                      'str': 'some_string',
-                      'additional_field_str': 'some_string',
-                  }]
-
-    table_schema = {
-        "fields": [{
-            "name": "number", "type": "INTEGER", 'mode': 'REQUIRED'
-        }, {
-            "name": "str", "type": "STRING", 'mode': 'REQUIRED'
-        }]
-    }
-
-    errors_table_schema = {
-        "fields": [{
-            'name': 'table', 'type': 'STRING', 'mode': 'REQUIRED'
-        }, {
-            'name': 'reason', 'type': 'STRING', 'mode': 'NULLABLE'
-        }, {
-            'name': 'row_json', 'type': 'STRING', 'mode': 'REQUIRED'
-        }]
-    }
-
-    pipeline_verifiers = [
-        BigqueryFullResultMatcher(
-            project=self.project,
-            query="SELECT number, str FROM %s" % table_id,
-            data=[(1, 'some_string')]),
-        BigqueryFullResultMatcher(
-            project=self.project,
-            query="SELECT table, reason, row_json FROM %s" % errors_table_id,
-            data=
-            [(
-                table_id,
-                '[{"reason": "invalid", "location": "", "debugInfo": "", \
-"message": "Missing required field: Msg_0_CLOUD_QUERY_TABLE.str."}]',
-                '{"number": 2}'),
-             (
-                 table_id,
-                 '[{"reason": "invalid", "location": "additional_field_str", \
-"debugInfo": "", "message": "no such field: additional_field_str."}]',
-                 '{"number": 3, "str": "some_string", "additional_field_str": \
-"some_string"}')])
-    ]
-
-    args = self.test_pipeline.get_full_options_as_args(
-        on_success_matcher=hc.all_of(*pipeline_verifiers))
-
-    with beam.Pipeline(argv=args) as p:
-      # pylint: disable=expression-not-assigned
-      errors = (
-          p | 'create' >> beam.Create(input_data)
-          | 'write' >> beam.io.WriteToBigQuery(
-              table_id,
-              schema=table_schema,
-              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-              write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
-      (
-          errors["FailedRows"]
-          | 'ParseErrors' >> beam.Map(
-              lambda err: {
-                  "table": err[0],
-                  "reason": json.dumps(err[2]),
-                  "row_json": json.dumps(err[1])
-              })
-          | 'WriteErrors' >> beam.io.WriteToBigQuery(
-              errors_table_id,
-              schema=errors_table_schema,
-              create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-              write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY))
-
   @pytest.mark.it_postcommit
   @parameterized.expand([
       param(file_format=FileFormat.AVRO),

Reply via email to