This is an automated email from the ASF dual-hosted git repository.

damccorm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new e640b2578fb Force BQIO to output elements in the correct row (#32584)
e640b2578fb is described below

commit e640b2578fb1d31ac5823fc95cceff346624d4b3
Author: Danny McCormick <[email protected]>
AuthorDate: Mon Sep 30 12:43:17 2024 -0400

    Force BQIO to output elements in the correct row (#32584)
    
    * Fix bqio
    
    * import fix
    
    * syntax
    
    * feedback
---
 .github/trigger_files/beam_PostCommit_Python.json  |  2 +-
 ...tCommit_Python_ValidatesContainer_Dataflow.json |  3 +-
 sdks/python/apache_beam/io/gcp/bigquery.py         | 59 +++++++++++++---------
 .../apache_beam/io/gcp/bigquery_write_it_test.py   |  1 +
 4 files changed, 38 insertions(+), 27 deletions(-)

diff --git a/.github/trigger_files/beam_PostCommit_Python.json 
b/.github/trigger_files/beam_PostCommit_Python.json
index 30ee463ad4e..1eb60f6e495 100644
--- a/.github/trigger_files/beam_PostCommit_Python.json
+++ b/.github/trigger_files/beam_PostCommit_Python.json
@@ -1,5 +1,5 @@
 {
   "comment": "Modify this file in a trivial way to cause this test suite to 
run.",
-  "modification": 2
+  "modification": 3
 }
 
diff --git 
a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json 
b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
index d6c608f6dab..4897480d69a 100644
--- 
a/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
+++ 
b/.github/trigger_files/beam_PostCommit_Python_ValidatesContainer_Dataflow.json
@@ -1,3 +1,4 @@
 {
-    "comment": "Modify this file in a trivial way to cause this test suite to 
run"
+    "comment": "Modify this file in a trivial way to cause this test suite to 
run",
+    "modification": 1
 }
\ No newline at end of file
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index b897df2d32a..2cb64742f26 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -418,7 +418,6 @@ from apache_beam.transforms.external import 
SchemaAwareExternalTransform
 from apache_beam.transforms.sideinputs import SIDE_INPUT_PREFIX
 from apache_beam.transforms.sideinputs import get_sideinput_index
 from apache_beam.transforms.util import ReshufflePerKey
-from apache_beam.transforms.window import GlobalWindows
 from apache_beam.typehints.row_type import RowTypeConstraint
 from apache_beam.typehints.schemas import schema_from_element_type
 from apache_beam.utils import retry
@@ -1581,7 +1580,8 @@ class BigQueryWriteFn(DoFn):
         additional_create_parameters=self.additional_bq_parameters)
     _KNOWN_TABLES.add(str_table_reference)
 
-  def process(self, element, *schema_side_inputs):
+  def process(
+      self, element, window_value=DoFn.WindowedValueParam, 
*schema_side_inputs):
     destination = bigquery_tools.get_hashable_destination(element[0])
 
     if callable(self.schema):
@@ -1608,12 +1608,11 @@ class BigQueryWriteFn(DoFn):
         return [
             pvalue.TaggedOutput(
                 BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
-                GlobalWindows.windowed_value(
+                window_value.with_value(
                     (destination, row_and_insert_id[0], error))),
             pvalue.TaggedOutput(
                 BigQueryWriteFn.FAILED_ROWS,
-                GlobalWindows.windowed_value(
-                    (destination, row_and_insert_id[0])))
+                window_value.with_value((destination, row_and_insert_id[0])))
         ]
 
       # Flush current batch first if adding this row will exceed our limits
@@ -1624,11 +1623,11 @@ class BigQueryWriteFn(DoFn):
         flushed_batch = self._flush_batch(destination)
         # After flushing our existing batch, we now buffer the current row
         # for the next flush
-        self._rows_buffer[destination].append(row_and_insert_id)
+        self._rows_buffer[destination].append((row_and_insert_id, 
window_value))
         self._destination_buffer_byte_size[destination] = row_byte_size
         return flushed_batch
 
-      self._rows_buffer[destination].append(row_and_insert_id)
+      self._rows_buffer[destination].append((row_and_insert_id, window_value))
       self._destination_buffer_byte_size[destination] += row_byte_size
       self._total_buffered_rows += 1
       if self._total_buffered_rows >= self._max_buffered_rows:
@@ -1636,7 +1635,8 @@ class BigQueryWriteFn(DoFn):
     else:
       # The input is already batched per destination, flush the rows now.
       batched_rows = element[1]
-      self._rows_buffer[destination].extend(batched_rows)
+      for r in batched_rows:
+        self._rows_buffer[destination].append((r, window_value))
       return self._flush_batch(destination)
 
   def finish_bundle(self):
@@ -1659,7 +1659,7 @@ class BigQueryWriteFn(DoFn):
   def _flush_batch(self, destination):
 
     # Flush the current batch of rows to BigQuery.
-    rows_and_insert_ids = self._rows_buffer[destination]
+    rows_and_insert_ids_with_windows = self._rows_buffer[destination]
     table_reference = bigquery_tools.parse_table_reference(destination)
     if table_reference.projectId is None:
       table_reference.projectId = vp.RuntimeValueProvider.get_value(
@@ -1668,9 +1668,10 @@ class BigQueryWriteFn(DoFn):
     _LOGGER.debug(
         'Flushing data to %s. Total %s rows.',
         destination,
-        len(rows_and_insert_ids))
-    self.batch_size_metric.update(len(rows_and_insert_ids))
+        len(rows_and_insert_ids_with_windows))
+    self.batch_size_metric.update(len(rows_and_insert_ids_with_windows))
 
+    rows_and_insert_ids, window_values = zip(*rows_and_insert_ids_with_windows)
     rows = [r[0] for r in rows_and_insert_ids]
     if self.ignore_insert_ids:
       insert_ids = [None for r in rows_and_insert_ids]
@@ -1689,8 +1690,10 @@ class BigQueryWriteFn(DoFn):
           ignore_unknown_values=self.ignore_unknown_columns)
       self.batch_latency_metric.update((time.time() - start) * 1000)
 
-      failed_rows = [(rows[entry['index']], entry["errors"])
+      failed_rows = [(
+          rows[entry['index']], entry["errors"], window_values[entry['index']])
                      for entry in errors]
+      failed_insert_ids = [insert_ids[entry['index']] for entry in errors]
       retry_backoff = next(self._backoff_calculator, None)
 
       # If retry_backoff is None, then we will not retry and must log.
@@ -1721,7 +1724,11 @@ class BigQueryWriteFn(DoFn):
         _LOGGER.info(
             'Sleeping %s seconds before retrying insertion.', retry_backoff)
         time.sleep(retry_backoff)
+        # We can now safely discard all information about successful rows and
+        # just focus on the failed ones
         rows = [fr[0] for fr in failed_rows]
+        window_values = [fr[2] for fr in failed_rows]
+        insert_ids = failed_insert_ids
         self._throttled_secs.inc(retry_backoff)
 
     self._total_buffered_rows -= len(self._rows_buffer[destination])
@@ -1729,19 +1736,21 @@ class BigQueryWriteFn(DoFn):
     if destination in self._destination_buffer_byte_size:
       del self._destination_buffer_byte_size[destination]
 
-    return itertools.chain([
-        pvalue.TaggedOutput(
-            BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
-            GlobalWindows.windowed_value((destination, row, err))) for row,
-        err in failed_rows
-    ],
-                           [
-                               pvalue.TaggedOutput(
-                                   BigQueryWriteFn.FAILED_ROWS,
-                                   GlobalWindows.windowed_value(
-                                       (destination, row))) for row,
-                               unused_err in failed_rows
-                           ])
+    return itertools.chain(
+        [
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS_WITH_ERRORS,
+                w.with_value((destination, row, err))) for row,
+            err,
+            w in failed_rows
+        ],
+        [
+            pvalue.TaggedOutput(
+                BigQueryWriteFn.FAILED_ROWS, w.with_value((destination, row)))
+            for row,
+            unused_err,
+            w in failed_rows
+        ])
 
 
 # The number of shards per destination when writing via streaming inserts.
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py 
b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index b0140793cf7..cd3edf19de5 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -491,6 +491,7 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
       # pylint: disable=expression-not-assigned
       errors = (
           p | 'create' >> beam.Create(input_data)
+          | beam.WindowInto(beam.transforms.window.FixedWindows(10))
           | 'write' >> beam.io.WriteToBigQuery(
               table_id,
               schema=table_schema,

Reply via email to