[ 
https://issues.apache.org/jira/browse/BEAM-11277?focusedWorklogId=569148&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-569148
 ]

ASF GitHub Bot logged work on BEAM-11277:
-----------------------------------------

                Author: ASF GitHub Bot
            Created on: 19/Mar/21 21:32
            Start Date: 19/Mar/21 21:32
    Worklog Time Spent: 10m 
      Work Description: chunyang commented on a change in pull request #14113:
URL: https://github.com/apache/beam/pull/14113#discussion_r597985491



##########
File path: sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
##########
@@ -313,6 +320,111 @@ def process(self, element, file_prefix, 
*schema_side_inputs):
       yield (destination, (file_path, file_size))
 
 
+class UpdateDestinationSchema(beam.DoFn):
+  """Update destination schema based on data that is about to be copied into 
it.
+
+  Unlike load and query jobs, BigQuery copy jobs do not support schema field
+  addition or relaxation on the destination table. This DoFn fills that gap by
+  updating the destination table schemas to be compatible with the data coming
+  from the source table so that schema field modification options are respected
+  regardless of whether data is loaded directly to the destination table or
+  loaded into temporary tables before being copied into the destination.
+
+  This tranform takes as input a (destination, job_reference) pair where the
+  job_reference refers to a completed load job into a temporary table.
+
+  This transform emits (destination, job_reference) pairs where the
+  job_reference refers to a submitted load job for performing the schema
+  modification. Note that the input and output job references are not the same.
+
+  Experimental; no backwards compatibility guarantees.
+  """
+  def __init__(
+      self,
+      write_disposition=None,
+      test_client=None,
+      additional_bq_parameters=None,
+      step_name=None):
+    self._test_client = test_client
+    self._write_disposition = write_disposition
+    self._additional_bq_parameters = additional_bq_parameters or {}
+    self._step_name = step_name
+
+  def setup(self):
+    self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
+    self._bq_io_metadata = create_bigquery_io_metadata(self._step_name)
+
+  def process(self, element, schema_mod_job_name_prefix):
+    destination = element[0]
+    temp_table_load_job_reference = element[1]
+
+    if callable(self._additional_bq_parameters):
+      additional_parameters = self._additional_bq_parameters(destination)
+    elif isinstance(self._additional_bq_parameters, vp.ValueProvider):
+      additional_parameters = self._additional_bq_parameters.get()
+    else:
+      additional_parameters = self._additional_bq_parameters
+
+    # When writing to normal tables WRITE_TRUNCATE will overwrite the schema 
but
+    # when writing to a partition, care needs to be taken to update the schema
+    # even on WRITE_TRUNCATE.
+    if (self._write_disposition not in ('WRITE_TRUNCATE', 'WRITE_APPEND') or
+        not additional_parameters or
+        not additional_parameters.get("schemaUpdateOptions")):
+      # No need to modify schema of destination table
+      return
+
+    table_reference = bigquery_tools.parse_table_reference(destination)
+    if table_reference.projectId is None:
+      table_reference.projectId = vp.RuntimeValueProvider.get_value(
+          'project', str, '')
+
+    try:
+      # Check if destination table exists
+      _ = self._bq_wrapper.get_table(
+          project_id=table_reference.projectId,
+          dataset_id=table_reference.datasetId,
+          table_id=table_reference.tableId)
+    except HttpError as exn:
+      if exn.status_code == 404:
+        # Destination table does not exist, so no need to modify its schema
+        # ahead of the copy jobs.
+        return
+      else:
+        raise
+
+    temp_table_load_job = self._bq_wrapper.get_job(
+        project=temp_table_load_job_reference.projectId,
+        job_id=temp_table_load_job_reference.jobId,
+        location=temp_table_load_job_reference.location)
+    temp_table_schema = temp_table_load_job.configuration.load.schema
+

Review comment:
       Added a simple comparison of `destination_table.schema == 
temp_table_schema`. It will work for trivial cases but doesn't catch the cases 
where the order of fields in a record differs. E.g., the following schemas are 
different according to `==` even though the temp table can be directly appended 
to the destination table without error.
   
   ```
   <TableSchema
     fields: [
       <TableFieldSchema fields: [], name: 'bytes', type: 'BYTES'>,
       <TableFieldSchema fields: [], name: 'date', type: 'DATE'>,
       <TableFieldSchema fields: [], name: 'time', type: 'TIME'>
     ]>
   ```
   ```
   <TableSchema
     fields: [
       <TableFieldSchema fields: [], name: 'date', type: 'DATE'>,
       <TableFieldSchema fields: [], name: 'time', type: 'TIME'>,
       <TableFieldSchema fields: [], name: 'bytes', type: 'BYTES'>
     ]>
   ```
   
   I can probably write a function to check the schema recursively but do you 
know if one already exists?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Issue Time Tracking
-------------------

    Worklog Id:     (was: 569148)
    Time Spent: 6.5h  (was: 6h 20m)

> WriteToBigQuery with batch file loads does not respect schema update options 
> when there are multiple load jobs
> --------------------------------------------------------------------------------------------------------------
>
>                 Key: BEAM-11277
>                 URL: https://issues.apache.org/jira/browse/BEAM-11277
>             Project: Beam
>          Issue Type: Bug
>          Components: io-py-gcp, runner-dataflow
>    Affects Versions: 2.21.0, 2.24.0, 2.25.0, 2.28.0
>            Reporter: Chun Yang
>            Assignee: Chun Yang
>            Priority: P2
>         Attachments: repro.py
>
>          Time Spent: 6.5h
>  Remaining Estimate: 0h
>
> When multiple load jobs are needed to write data to a destination table, 
> e.g., when the data is spread over more than 
> [10,000|https://cloud.google.com/bigquery/quotas#load_jobs] URIs, 
> WriteToBigQuery in FILE_LOADS mode will write data into temporary tables and 
> then copy the temporary tables into the destination table.
> When WriteToBigQuery is used with 
> {{write_disposition=BigQueryDisposition.WRITE_APPEND}} and 
> {{additional_bq_parameters=\{"schemaUpdateOptions": 
> ["ALLOW_FIELD_ADDITION"]\}}}, the schema update options are not respected by 
> the jobs that copy data from temporary tables into the destination table. The 
> effect is that for small jobs (<10K source URIs), schema field addition is 
> allowed, however, if the job is scaled to >10K source URIs, then schema field 
> addition will fail with an error such as:
> {code:none}Provided Schema does not match Table project:dataset.table. Cannot 
> add fields (field: field_name){code}
> I've been able to reproduce this issue with Python 3.7 and DataflowRunner on 
> Beam 2.21.0 and Beam 2.25.0. I could not reproduce the issue with 
> DirectRunner. A minimal reproducible example is attached.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to