[
https://issues.apache.org/jira/browse/AIRFLOW-3316?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16732848#comment-16732848
]
Conrad Lee commented on AIRFLOW-3316:
-------------------------------------
Hi [~jackjack10]. I think the conditions in the if statements are fine. The
first (outer) if statement handles the case where where there are not schema
fields, and that's exactly what the condition checks for.
I've created a PR that fixes the issue here:
https://github.com/apache/incubator-airflow/pull/4430
> GCS to BQ operator leaves schema_fields operator unset when autodetect=True
> ---------------------------------------------------------------------------
>
> Key: AIRFLOW-3316
> URL: https://issues.apache.org/jira/browse/AIRFLOW-3316
> Project: Apache Airflow
> Issue Type: Bug
> Components: operators
> Affects Versions: 1.10.1
> Reporter: Conrad Lee
> Assignee: Conrad Lee
> Priority: Minor
>
> When I use the GoogleCloudStorageToBigQueryOperator to load data from Parquet
> into BigQuery, I leave the schema_fields argument set to 'None' and set
> autodetect=True.
>
> This causes the following error:
>
> {code:java}
> [2018-11-08 09:42:03,690] {models.py:1736} ERROR - local variable
> 'schema_fields' referenced before assignment
> Traceback (most recent call last)
> File "/usr/local/lib/airflow/airflow/models.py", line 1633, in _run_raw_tas
> result = task_copy.execute(context=context
> File "/home/airflow/gcs/plugins/bq_operator_updated.py", line 2018, in
> execut
> schema_fields=schema_fields
> UnboundLocalError: local variable 'schema_fields' referenced before assignmen
> {code}
>
> The problem is this set of checks in which the schema_fields variable is set
> neglects to cover all the cases
> {code:java}
> if not self.schema_fields:
> if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
> gcs_hook = GoogleCloudStorageHook(
> google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
> delegate_to=self.delegate_to)
> schema_fields = json.loads(gcs_hook.download(
> self.bucket,
> self.schema_object).decode("utf-8"))
> elif self.schema_object is None and self.autodetect is False:
> raise ValueError('At least one of `schema_fields`, `schema_object`, '
> 'or `autodetect` must be passed.')
> else:
> schema_fields = self.schema_fields
> {code}
> After the `elif` we need to handle the case where autodetect is set to True.
> This can be done by simply adding two lines:
> {code:java}
> if not self.schema_fields:
> if self.schema_object and self.source_format != 'DATASTORE_BACKUP':
> gcs_hook = GoogleCloudStorageHook(
> google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
> delegate_to=self.delegate_to)
> schema_fields = json.loads(gcs_hook.download(
> self.bucket,
> self.schema_object).decode("utf-8"))
> elif self.schema_object is None and self.autodetect is False:
> raise ValueError('At least one of `schema_fields`, `schema_object`, '
> 'or `autodetect` must be passed.')
> else:
> schema_fiels = None
> else:
> schema_fields = self.schema_fields{code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)