Conrad Lee created AIRFLOW-3316:
-----------------------------------
Summary: GCS to BQ operator leaves schema_fields operator unset
when autodetect=True
Key: AIRFLOW-3316
URL: https://issues.apache.org/jira/browse/AIRFLOW-3316
Project: Apache Airflow
Issue Type: Bug
Components: operators
Affects Versions: 1.10.1
Reporter: Conrad Lee
Assignee: Conrad Lee
When I use the GoogleCloudStorageToBigQueryOperator to load data from Parquet
into BigQuery, I leave the schema_fields argument set to 'None' and set
autodetect=True.
This causes the following error:
{code:java}
[2018-11-08 09:42:03,690] {models.py:1736} ERROR - local variable
'schema_fields' referenced before assignment
Traceback (most recent call last)
File "/usr/local/lib/airflow/airflow/models.py", line 1633, in _run_raw_tas
result = task_copy.execute(context=context
File "/home/airflow/gcs/plugins/bq_operator_updated.py", line 2018, in execut
schema_fields=schema_fields
UnboundLocalError: local variable 'schema_fields' referenced before assignmen
{code}
The problem is this set of checks in which the schema_fields variable is set
neglects to cover all the cases
{code:java}
if not self.schema_fields:
if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
self.bucket,
self.schema_object).decode("utf-8"))
elif self.schema_object is None and self.autodetect is False:
raise ValueError('At least one of `schema_fields`, `schema_object`, '
'or `autodetect` must be passed.')
else:
schema_fields = self.schema_fields
{code}
After the `elif` we need to handle the case where autodetect is set to True.
This can be done by simply adding two lines:
{code:java}
if not self.schema_fields:
if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
gcs_hook = GoogleCloudStorageHook(
google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
delegate_to=self.delegate_to)
schema_fields = json.loads(gcs_hook.download(
self.bucket,
self.schema_object).decode("utf-8"))
elif self.schema_object is None and self.autodetect is False:
raise ValueError('At least one of `schema_fields`, `schema_object`, '
'or `autodetect` must be passed.')
else:
schema_fiels = None
else:
schema_fields = self.schema_fields{code}
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)