Abacn commented on code in PR #28009:
URL: https://github.com/apache/beam/pull/28009#discussion_r1300291775


##########
sdks/python/apache_beam/io/gcp/bigquery_tools.py:
##########
@@ -1558,23 +1558,32 @@ def beam_row_from_dict(row: dict, schema):
   """
   if not isinstance(schema, (bigquery.TableSchema, bigquery.TableFieldSchema)):
     schema = get_bq_tableschema(schema)
-  schema_fields = {field.name: field for field in schema.fields}
   beam_row = {}
-  for col_name, value in row.items():
-    # get this column's schema field and handle struct types
-    field = schema_fields[col_name]
-    if field.type.upper() in ["RECORD", "STRUCT"]:
+  for field in schema.fields:
+    name = field.name
+    mode = field.mode.upper()
+    type = field.type.upper()
+    # When writing with Storage Write API via xlang, we give the Beam Row
+    # PCollection a hint on the schema using `with_output_types`.
+    # This requires that each row has all the fields in the schema.
+    # However, it's possible that some nullable fields don't appear in the row.
+    # For this case, we create the field with a `None` value
+    if name not in row and mode == "NULLABLE":
+      row[name] = None
+
+    value = row[name]
+    if type in ["RECORD", "STRUCT"]:
       # if this is a list of records, we create a list of Beam Rows
-      if field.mode.upper() == "REPEATED":
+      if mode == "REPEATED":
         list_of_beam_rows = []
         for record in value:
           list_of_beam_rows.append(beam_row_from_dict(record, field))
-        beam_row[col_name] = list_of_beam_rows
-      # otherwise, create a Beam Row from this record
+        beam_row[name] = list_of_beam_rows
+        # otherwise, create a Beam Row from this record

Review Comment:
   minor: this comment better fits the block below



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to