potiuk commented on a change in pull request #5504: [AIRFLOW-4869] Reorganize 
sql to gcs operators.
URL: https://github.com/apache/airflow/pull/5504#discussion_r307998362
 
 

 ##########
 File path: airflow/contrib/operators/mysql_to_gcs.py
 ##########
 @@ -170,139 +85,15 @@ def _query_mysql(self):
         cursor.execute(self.sql)
         return cursor
 
-    def _write_local_data_files(self, cursor):
-        """
-        Takes a cursor, and writes results to a local file.
-
-        :return: A dictionary where keys are filenames to be used as object
-            names in GCS, and values are file handles to local files that
-            contain the data for the GCS objects.
-        """
-        schema = list(map(lambda schema_tuple: schema_tuple[0], 
cursor.description))
-        col_type_dict = self._get_col_type_dict()
-        file_no = 0
-        tmp_file_handle = NamedTemporaryFile(delete=True)
-        if self.export_format == 'csv':
-            file_mime_type = 'text/csv'
-        else:
-            file_mime_type = 'application/json'
-        files_to_upload = [{
-            'file_name': self.filename.format(file_no),
-            'file_handle': tmp_file_handle,
-            'file_mime_type': file_mime_type
-        }]
-
-        if self.export_format == 'csv':
-            csv_writer = self._configure_csv_file(tmp_file_handle, schema)
-
-        for row in cursor:
-            # Convert datetime objects to utc seconds, and decimals to floats.
-            # Convert binary type object to string encoded with base64.
-            row = self._convert_types(schema, col_type_dict, row)
-
-            if self.export_format == 'csv':
-                csv_writer.writerow(row)
-            else:
-                row_dict = dict(zip(schema, row))
-
-                # TODO validate that row isn't > 2MB. BQ enforces a hard row 
size of 2MB.
-                s = json.dumps(row_dict, sort_keys=True).encode('utf-8')
-                tmp_file_handle.write(s)
-
-                # Append newline to make dumps BigQuery compatible.
-                tmp_file_handle.write(b'\n')
-
-            # Stop if the file exceeds the file size limit.
-            if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
-                file_no += 1
-                tmp_file_handle = NamedTemporaryFile(delete=True)
-                files_to_upload.append({
-                    'file_name': self.filename.format(file_no),
-                    'file_handle': tmp_file_handle,
-                    'file_mime_type': file_mime_type
-                })
-
-                if self.export_format == 'csv':
-                    csv_writer = self._configure_csv_file(tmp_file_handle, 
schema)
-
-        return files_to_upload
-
-    def _configure_csv_file(self, file_handle, schema):
-        """Configure a csv writer with the file_handle and write schema
-        as headers for the new file.
-        """
-        csv_writer = csv.writer(file_handle, encoding='utf-8',
-                                delimiter=self.field_delimiter)
-        csv_writer.writerow(schema)
-        return csv_writer
-
-    def _write_local_schema_file(self, cursor):
-        """
-        Takes a cursor, and writes the BigQuery schema in .json format for the
-        results to a local file system.
-
-        :return: A dictionary where key is a filename to be used as an object
-            name in GCS, and values are file handles to local files that
-            contains the BigQuery schema fields in .json format.
-        """
-        schema_str = None
-        schema_file_mime_type = 'application/json'
-        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
-        if self.schema is not None and isinstance(self.schema, str):
-            schema_str = self.schema.encode('utf-8')
-        elif self.schema is not None and isinstance(self.schema, list):
-            schema_str = json.dumps(self.schema).encode('utf-8')
-        else:
-            schema = []
-            for field in cursor.description:
-                # See PEP 249 for details about the description tuple.
-                field_name = field[0]
-                field_type = self.type_map.get(field[1], "STRING")
-                # Always allow TIMESTAMP to be nullable. MySQLdb returns None 
types
-                # for required fields because some MySQL timestamps can't be
-                # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
-                if field[6] or field_type == 'TIMESTAMP':
-                    field_mode = 'NULLABLE'
-                else:
-                    field_mode = 'REQUIRED'
-                schema.append({
-                    'name': field_name,
-                    'type': field_type,
-                    'mode': field_mode,
-                })
-            schema_str = json.dumps(schema, sort_keys=True).encode('utf-8')
-        tmp_schema_file_handle.write(schema_str)
-
-        self.log.info('Using schema for %s: %s', self.schema_filename, 
schema_str)
-        schema_file_to_upload = {
-            'file_name': self.schema_filename,
-            'file_handle': tmp_schema_file_handle,
-            'file_mime_type': schema_file_mime_type
+    def field_to_bigquery(self, field):
+        field_type = self.type_map.get(field[1], "STRING")
+        return {
+            'name': field[0],
+            'type': field_type,
+            'mode': "NULLABLE" if field[6] or field_type == "TIMESTAMP" else 
"REQUIRED",
 
 Review comment:
   The comment that was previous there about TIMESTAMP nullable should be 
copied here. It's not obvious where it comes from.
   ```
   # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
   # for required fields because some MySQL timestamps can't be
   # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
   ```

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to