Conrad Lee created AIRFLOW-3316:
-----------------------------------

             Summary: GCS to BQ operator leaves schema_fields operator unset 
when autodetect=True
                 Key: AIRFLOW-3316
                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3316
             Project: Apache Airflow
          Issue Type: Bug
          Components: operators
    Affects Versions: 1.10.1
            Reporter: Conrad Lee
            Assignee: Conrad Lee


When I use the GoogleCloudStorageToBigQueryOperator to load data from Parquet 
into BigQuery, I leave the schema_fields argument set to 'None' and set 
autodetect=True.

 

This causes the following error: 

 
{code:java}
[2018-11-08 09:42:03,690] {models.py:1736} ERROR - local variable 
'schema_fields' referenced before assignment
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models.py", line 1633, in _run_raw_tas
    result = task_copy.execute(context=context
  File "/home/airflow/gcs/plugins/bq_operator_updated.py", line 2018, in execut
    schema_fields=schema_fields
UnboundLocalError: local variable 'schema_fields' referenced before assignmen
{code}
 

The problem is this set of checks in which the schema_fields variable is set 
neglects to cover all the cases
{code:java}
if not self.schema_fields:
  if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
    gcs_hook = GoogleCloudStorageHook(
        google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, 
        delegate_to=self.delegate_to)

    schema_fields = json.loads(gcs_hook.download(
      self.bucket,
      self.schema_object).decode("utf-8"))
  elif self.schema_object is None and self.autodetect is False:
    raise ValueError('At least one of `schema_fields`, `schema_object`, '
    'or `autodetect` must be passed.')

else:
    schema_fields = self.schema_fields

{code}
After the `elif` we need to handle the case where autodetect is set to True.  
This can be done by simply adding two lines:
{code:java}
if not self.schema_fields:
  if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
    gcs_hook = GoogleCloudStorageHook(
        google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, 
        delegate_to=self.delegate_to)

    schema_fields = json.loads(gcs_hook.download(
      self.bucket,
      self.schema_object).decode("utf-8"))
  elif self.schema_object is None and self.autodetect is False:
    raise ValueError('At least one of `schema_fields`, `schema_object`, '
    'or `autodetect` must be passed.')
  else:
    schema_fiels = None
else:
    schema_fields = self.schema_fields{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to