Conrad Lee created AIRFLOW-3316: ----------------------------------- Summary: GCS to BQ operator leaves schema_fields operator unset when autodetect=True Key: AIRFLOW-3316 URL: https://issues.apache.org/jira/browse/AIRFLOW-3316 Project: Apache Airflow Issue Type: Bug Components: operators Affects Versions: 1.10.1 Reporter: Conrad Lee Assignee: Conrad Lee
When I use the GoogleCloudStorageToBigQueryOperator to load data from Parquet into BigQuery, I leave the schema_fields argument set to 'None' and set autodetect=True. This causes the following error: {code:java} [2018-11-08 09:42:03,690] {models.py:1736} ERROR - local variable 'schema_fields' referenced before assignment Traceback (most recent call last) File "/usr/local/lib/airflow/airflow/models.py", line 1633, in _run_raw_tas result = task_copy.execute(context=context File "/home/airflow/gcs/plugins/bq_operator_updated.py", line 2018, in execut schema_fields=schema_fields UnboundLocalError: local variable 'schema_fields' referenced before assignmen {code} The problem is this set of checks in which the schema_fields variable is set neglects to cover all the cases {code:java} if not self.schema_fields: if self.schema_object and self.source_format != 'DATASTORE_BACKUP': gcs_hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) schema_fields = json.loads(gcs_hook.download( self.bucket, self.schema_object).decode("utf-8")) elif self.schema_object is None and self.autodetect is False: raise ValueError('At least one of `schema_fields`, `schema_object`, ' 'or `autodetect` must be passed.') else: schema_fields = self.schema_fields {code} After the `elif` we need to handle the case where autodetect is set to True. This can be done by simply adding two lines: {code:java} if not self.schema_fields: if self.schema_object and self.source_format != 'DATASTORE_BACKUP': gcs_hook = GoogleCloudStorageHook( google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, delegate_to=self.delegate_to) schema_fields = json.loads(gcs_hook.download( self.bucket, self.schema_object).decode("utf-8")) elif self.schema_object is None and self.autodetect is False: raise ValueError('At least one of `schema_fields`, `schema_object`, ' 'or `autodetect` must be passed.') else: schema_fiels = None else: schema_fields = self.schema_fields{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)