Repository: incubator-airflow Updated Branches: refs/heads/v1-9-test 5a92f0676 -> ef4f98840
Revert "[AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator" Reverting due to improper handling of binary description_flag. This reverts commit d578b292e96d5fdd87b5168508005cd73edc4f96. Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ef4f9884 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ef4f9884 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ef4f9884 Branch: refs/heads/v1-9-test Commit: ef4f98840d55a62a102bda45d72a55cf618a1412 Parents: 5a92f06 Author: Chris Riccomini <criccom...@apache.org> Authored: Fri Oct 13 16:55:22 2017 -0700 Committer: Chris Riccomini <criccom...@apache.org> Committed: Fri Oct 13 16:55:56 2017 -0700 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 4 +-- airflow/contrib/operators/mysql_to_gcs.py | 48 +++++++------------------- 2 files changed, 14 insertions(+), 38 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ef4f9884/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index fab2a43..5fc7e22 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -971,9 +971,7 @@ def _bq_cast(string_field, bq_type): if string_field is None: return None elif bq_type == 'INTEGER' or bq_type == 'TIMESTAMP': - # convert to float first to handle cases where string_field is - # represented in scientific notation - return int(float(string_field)) + return int(string_field) elif bq_type == 'FLOAT': return float(string_field) elif bq_type == 'BOOLEAN': http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ef4f9884/airflow/contrib/operators/mysql_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index 47b7ac9..f94bc24 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -14,7 +14,6 @@ import json import time -import base64 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook from airflow.hooks.mysql_hook import MySqlHook @@ -22,7 +21,7 @@ from airflow.models import BaseOperator from airflow.utils.decorators import apply_defaults from datetime import date, datetime from decimal import Decimal -from MySQLdb.constants import FIELD_TYPE, FLAG +from MySQLdb.constants import FIELD_TYPE from tempfile import NamedTemporaryFile @@ -121,20 +120,15 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): names in GCS, and values are file handles to local files that contain the data for the GCS objects. """ - field_names = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) - mysql_types = list(map(lambda schema_tuple: schema_tuple[1], cursor.description)) - byte_fields = [self.is_binary(t, f) for t, f in zip(mysql_types, cursor.description_flags)] - + schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) file_no = 0 - tmp_file_handle = NamedTemporaryFile(mode='w', delete=True) + tmp_file_handle = NamedTemporaryFile(delete=True) tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} for row in cursor: - # Convert datetime objects to utc seconds, decimals to floats, and binaries - # to base64-encoded strings - row_dict = {} - for name, value, is_binary in zip(field_names, row, byte_fields): - row_dict[name] = self.convert_types(value, is_binary) + # Convert datetime objects to utc seconds, and decimals to floats + row = map(self.convert_types, row) + row_dict = dict(zip(schema, row)) # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB. json.dump(row_dict, tmp_file_handle) @@ -145,7 +139,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): # Stop if the file exceeds the file size limit. if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: file_no += 1 - tmp_file_handle = NamedTemporaryFile(mode='w', delete=True) + tmp_file_handle = NamedTemporaryFile(delete=True) tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle return tmp_file_handles @@ -160,12 +154,10 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): contains the BigQuery schema fields in .json format. """ schema = [] - for field, flag in zip(cursor.description, cursor.description_flags): + for field in cursor.description: # See PEP 249 for details about the description tuple. field_name = field[0] - - field_type = self.type_map(field[1], flag) - + field_type = self.type_map(field[1]) # Always allow TIMESTAMP to be nullable. MySQLdb returns None types # for required fields because some MySQL timestamps can't be # represented by Python's datetime (e.g. 0000-00-00 00:00:00). @@ -177,7 +169,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): }) self.log.info('Using schema for %s: %s', self.schema_filename, schema) - tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True) + tmp_schema_file_handle = NamedTemporaryFile(delete=True) json.dump(schema, tmp_schema_file_handle) return {self.schema_filename: tmp_schema_file_handle} @@ -192,24 +184,21 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json') @classmethod - def convert_types(cls, value, is_binary=False): + def convert_types(cls, value): """ Takes a value from MySQLdb, and converts it to a value that's safe for JSON/Google cloud storage/BigQuery. Dates are converted to UTC seconds. - Decimals are converted to floats. Binaries are converted to base64-encoded - strings. + Decimals are converted to floats. """ if type(value) in (datetime, date): return time.mktime(value.timetuple()) elif isinstance(value, Decimal): return float(value) - elif is_binary: - return base64.b64encode(value).decode() else: return value @classmethod - def type_map(cls, mysql_type, flags): + def type_map(cls, mysql_type): """ Helper function that maps from MySQL fields to BigQuery fields. Used when a schema_filename is set. @@ -231,15 +220,4 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): FIELD_TYPE.TIMESTAMP: 'TIMESTAMP', FIELD_TYPE.YEAR: 'INTEGER', } - - if MySqlToGoogleCloudStorageOperator.is_binary(mysql_type, flags): - return 'BYTES' - return d[mysql_type] if mysql_type in d else 'STRING' - - @classmethod - def is_binary(cls, mysql_type, flags): - # MySQLdb groups both char/varchar and binary/varbinary as STRING/VAR_STRING. - # To work around this ambiguity, check the description flag to see if it's a binary field. - return mysql_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING] and \ - flags & FLAG.BINARY == FLAG.BINARY