whynick1 commented on a change in pull request #8049: [AIRFLOW-7117] Honor 
self.schema in sql_to_gcs as schema to upload
URL: https://github.com/apache/airflow/pull/8049#discussion_r402507732
 
 

 ##########
 File path: airflow/providers/google/cloud/operators/sql_to_gcs.py
 ##########
 @@ -251,18 +252,20 @@ def _get_col_type_dict(self):
     def _write_local_schema_file(self, cursor):
         """
         Takes a cursor, and writes the BigQuery schema for the results to a
-        local file system.
+        local file system. Schema for database will be read from cursor if
+        not specified.
 
         :return: A dictionary where key is a filename to be used as an object
             name in GCS, and values are file handles to local files that
             contains the BigQuery schema fields in .json format.
         """
-        schema = [self.field_to_bigquery(field) for field in 
cursor.description]
+        schema = self.schema or [self.field_to_bigquery(field) for field in 
cursor.description]
 
         self.log.info('Using schema for %s', self.schema_filename)
         self.log.debug("Current schema: %s", schema)
         tmp_schema_file_handle = NamedTemporaryFile(delete=True)
-        tmp_schema_file_handle.write(json.dumps(schema, 
sort_keys=True).encode('utf-8'))
+        schema_json = json.dumps(schema, sort_keys=True) if isinstance(schema, 
list) else schema
+        tmp_schema_file_handle.write(schema_json.encode('utf-8'))
 
 Review comment:
   Thanks for the review! I like the structure.
   Just to be clear, `self.schema` could be either `str` or `list`, and we 
don't want to call `json.dumps()` on a string. How about this edited version? 
@mik-laj 
   ```
           if self.shcema:
               self.log.info("Using user schema")
               schema = self.schema
           else:
               self.log.info("Starts generating schema")
               schema = [self.field_to_bigquery(field) for field in 
cursor.description]            
           
           if isinstance(schema, list):
               schema = json.dumps(schema, sort_keys=True)
   
           self.log.info('Using schema for %s', self.schema_filename)
           self.log.debug("Current schema: %s", schema)
   
           tmp_schema_file_handle = NamedTemporaryFile(delete=True)
           tmp_schema_file_handle.write(schema.encode('utf-8'))
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to