Repository: incubator-airflow Updated Branches: refs/heads/master 21e94c7d1 -> d578b292e
[AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator Closes #2680 from jgao54/write-binary Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d578b292 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d578b292 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d578b292 Branch: refs/heads/master Commit: d578b292e96d5fdd87b5168508005cd73edc4f96 Parents: 21e94c7 Author: Joy Gao <[email protected]> Authored: Wed Oct 11 13:06:17 2017 -0700 Committer: Chris Riccomini <[email protected]> Committed: Wed Oct 11 13:06:17 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 4 ++- airflow/contrib/operators/mysql_to_gcs.py | 48 +++++++++++++++++++------- 2 files changed, 38 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d578b292/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 5fc7e22..fab2a43 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -971,7 +971,9 @@ def _bq_cast(string_field, bq_type): if string_field is None: return None elif bq_type == 'INTEGER' or bq_type == 'TIMESTAMP': - return int(string_field) + # convert to float first to handle cases where string_field is + # represented in scientific notation + return int(float(string_field)) elif bq_type == 'FLOAT': return float(string_field) elif bq_type == 'BOOLEAN': http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d578b292/airflow/contrib/operators/mysql_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index f94bc24..47b7ac9 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -14,6 +14,7 @@ import json import time +import base64 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.hooks.mysql_hook import MySqlHook @@ -21,7 +22,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from datetime import date, datetime from decimal import Decimal -from MySQLdb.constants import FIELD_TYPE +from MySQLdb.constants import FIELD_TYPE, FLAG from tempfile import NamedTemporaryFile @@ -120,15 +121,20 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): names in GCS, and values are file handles to local files that contain the data for the GCS objects. """ - schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) + field_names = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) + mysql_types = list(map(lambda schema_tuple: schema_tuple[1], cursor.description)) + byte_fields = [self.is_binary(t, f) for t, f in zip(mysql_types, cursor.description_flags)] + file_no = 0 - tmp_file_handle = NamedTemporaryFile(delete=True) + tmp_file_handle = NamedTemporaryFile(mode='w', delete=True) tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} for row in cursor: - # Convert datetime objects to utc seconds, and decimals to floats - row = map(self.convert_types, row) - row_dict = dict(zip(schema, row)) + # Convert datetime objects to utc seconds, decimals to floats, and binaries + # to base64-encoded strings + row_dict = {} + for name, value, is_binary in zip(field_names, row, byte_fields): + row_dict[name] = self.convert_types(value, is_binary) # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB. json.dump(row_dict, tmp_file_handle) @@ -139,7 +145,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): # Stop if the file exceeds the file size limit. if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: file_no += 1 - tmp_file_handle = NamedTemporaryFile(delete=True) + tmp_file_handle = NamedTemporaryFile(mode='w', delete=True) tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle return tmp_file_handles @@ -154,10 +160,12 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): contains the BigQuery schema fields in .json format. """ schema = [] - for field in cursor.description: + for field, flag in zip(cursor.description, cursor.description_flags): # See PEP 249 for details about the description tuple. field_name = field[0] - field_type = self.type_map(field[1]) + + field_type = self.type_map(field[1], flag) + # Always allow TIMESTAMP to be nullable. MySQLdb returns None types # for required fields because some MySQL timestamps can't be # represented by Python's datetime (e.g. 0000-00-00 00:00:00). @@ -169,7 +177,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): }) self.log.info('Using schema for %s: %s', self.schema_filename, schema) - tmp_schema_file_handle = NamedTemporaryFile(delete=True) + tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True) json.dump(schema, tmp_schema_file_handle) return {self.schema_filename: tmp_schema_file_handle} @@ -184,21 +192,24 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json') @classmethod - def convert_types(cls, value): + def convert_types(cls, value, is_binary=False): """ Takes a value from MySQLdb, and converts it to a value that's safe for JSON/Google cloud storage/BigQuery. Dates are converted to UTC seconds. - Decimals are converted to floats. + Decimals are converted to floats. Binaries are converted to base64-encoded + strings. """ if type(value) in (datetime, date): return time.mktime(value.timetuple()) elif isinstance(value, Decimal): return float(value) + elif is_binary: + return base64.b64encode(value).decode() else: return value @classmethod - def type_map(cls, mysql_type): + def type_map(cls, mysql_type, flags): """ Helper function that maps from MySQL fields to BigQuery fields. Used when a schema_filename is set. @@ -220,4 +231,15 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): FIELD_TYPE.TIMESTAMP: 'TIMESTAMP', FIELD_TYPE.YEAR: 'INTEGER', } + + if MySqlToGoogleCloudStorageOperator.is_binary(mysql_type, flags): + return 'BYTES' + return d[mysql_type] if mysql_type in d else 'STRING' + + @classmethod + def is_binary(cls, mysql_type, flags): + # MySQLdb groups both char/varchar and binary/varbinary as STRING/VAR_STRING. + # To work around this ambiguity, check the description flag to see if it's a binary field. + return mysql_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING] and \ + flags & FLAG.BINARY == FLAG.BINARY
