This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new c33a1ca  [BEAM-12079] Deterministic coding enforcement causes 
_StreamToBigQuery/CommitInsertIds/GroupByKey to fail
     new 5ddb69f  Merge pull request #14396 from [BEAM-12079] Fix 
_StreamToBigQuery/CommitInsertIds/GroupByKey failure
c33a1ca is described below

commit c33a1cad8164939a7a75cee78e48f37c9dfcf129
Author: Ludovic Post <[email protected]>
AuthorDate: Wed Mar 31 15:23:17 2021 -0700

    [BEAM-12079] Deterministic coding enforcement causes 
_StreamToBigQuery/CommitInsertIds/GroupByKey to fail
---
 sdks/python/apache_beam/io/gcp/bigquery.py | 9 +++++----
 1 file changed, 5 insertions(+), 4 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py 
b/sdks/python/apache_beam/io/gcp/bigquery.py
index 3097d02..c4edaea 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1431,7 +1431,7 @@ class _StreamToBigQuery(PTransform):
 
     def _restore_table_ref(sharded_table_ref_elems_kv):
       sharded_table_ref = sharded_table_ref_elems_kv[0]
-      table_ref = bigquery_tools.parse_table_reference(sharded_table_ref.key)
+      table_ref = bigquery_tools.parse_table_reference(sharded_table_ref)
       return (table_ref, sharded_table_ref_elems_kv[1])
 
     tagged_data = (
@@ -1439,7 +1439,8 @@ class _StreamToBigQuery(PTransform):
         | 'AppendDestination' >> beam.ParDo(
             bigquery_tools.AppendDestinationsFn(self.table_reference),
             *self.table_side_inputs)
-        | 'AddInsertIds' >> beam.ParDo(_StreamToBigQuery.InsertIdPrefixFn()))
+        | 'AddInsertIds' >> beam.ParDo(_StreamToBigQuery.InsertIdPrefixFn())
+        | 'ToHashableTableRef' >> beam.Map(_to_hashable_table_ref))
 
     if not self.with_auto_sharding:
       tagged_data = (
@@ -1458,14 +1459,14 @@ class _StreamToBigQuery(PTransform):
       # references are restored.
       tagged_data = (
           tagged_data
-          | 'ToHashableTableRef' >> beam.Map(_to_hashable_table_ref)
           | 'WithAutoSharding' >> beam.GroupIntoBatches.WithShardedKey(
               (self.batch_size or BigQueryWriteFn.DEFAULT_MAX_BUFFERED_ROWS),
               DEFAULT_BATCH_BUFFERING_DURATION_LIMIT_SEC)
-          | 'FromHashableTableRefAndDropShard' >> beam.Map(_restore_table_ref))
+          | 'DropShard' >> beam.Map(lambda kv: (kv[0].key, kv[1])))
 
     return (
         tagged_data
+        | 'FromHashableTableRef' >> beam.Map(_restore_table_ref)
         | 'StreamInsertRows' >> ParDo(
             bigquery_write_fn, *self.schema_side_inputs).with_outputs(
                 BigQueryWriteFn.FAILED_ROWS, main='main'))

Reply via email to