Repository: incubator-airflow
Updated Branches:
  refs/heads/v1-9-test 5a92f0676 -> ef4f98840


Revert "[AIRFLOW-1613] Handle binary field in MySqlToGoogleCloudStorageOperator"

Reverting due to improper handling of binary description_flag.

This reverts commit d578b292e96d5fdd87b5168508005cd73edc4f96.


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/ef4f9884
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/ef4f9884
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/ef4f9884

Branch: refs/heads/v1-9-test
Commit: ef4f98840d55a62a102bda45d72a55cf618a1412
Parents: 5a92f06
Author: Chris Riccomini <criccom...@apache.org>
Authored: Fri Oct 13 16:55:22 2017 -0700
Committer: Chris Riccomini <criccom...@apache.org>
Committed: Fri Oct 13 16:55:56 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py    |  4 +--
 airflow/contrib/operators/mysql_to_gcs.py | 48 +++++++-------------------
 2 files changed, 14 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ef4f9884/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py 
b/airflow/contrib/hooks/bigquery_hook.py
index fab2a43..5fc7e22 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -971,9 +971,7 @@ def _bq_cast(string_field, bq_type):
     if string_field is None:
         return None
     elif bq_type == 'INTEGER' or bq_type == 'TIMESTAMP':
-        # convert to float first to handle cases where string_field is
-        # represented in scientific notation
-        return int(float(string_field))
+        return int(string_field)
     elif bq_type == 'FLOAT':
         return float(string_field)
     elif bq_type == 'BOOLEAN':

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/ef4f9884/airflow/contrib/operators/mysql_to_gcs.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/mysql_to_gcs.py 
b/airflow/contrib/operators/mysql_to_gcs.py
index 47b7ac9..f94bc24 100644
--- a/airflow/contrib/operators/mysql_to_gcs.py
+++ b/airflow/contrib/operators/mysql_to_gcs.py
@@ -14,7 +14,6 @@
 
 import json
 import time
-import base64
 
 from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook
 from airflow.hooks.mysql_hook import MySqlHook
@@ -22,7 +21,7 @@ from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
 from datetime import date, datetime
 from decimal import Decimal
-from MySQLdb.constants import FIELD_TYPE, FLAG
+from MySQLdb.constants import FIELD_TYPE
 from tempfile import NamedTemporaryFile
 
 
@@ -121,20 +120,15 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             names in GCS, and values are file handles to local files that
             contain the data for the GCS objects.
         """
-        field_names = list(map(lambda schema_tuple: schema_tuple[0], 
cursor.description))
-        mysql_types = list(map(lambda schema_tuple: schema_tuple[1], 
cursor.description))
-        byte_fields = [self.is_binary(t, f) for t, f in zip(mysql_types, 
cursor.description_flags)]
-
+        schema = list(map(lambda schema_tuple: schema_tuple[0], 
cursor.description))
         file_no = 0
-        tmp_file_handle = NamedTemporaryFile(mode='w', delete=True)
+        tmp_file_handle = NamedTemporaryFile(delete=True)
         tmp_file_handles = {self.filename.format(file_no): tmp_file_handle}
 
         for row in cursor:
-            # Convert datetime objects to utc seconds, decimals to floats, and 
binaries
-            # to base64-encoded strings
-            row_dict = {}
-            for name, value, is_binary in zip(field_names, row, byte_fields):
-                row_dict[name] = self.convert_types(value, is_binary)
+            # Convert datetime objects to utc seconds, and decimals to floats
+            row = map(self.convert_types, row)
+            row_dict = dict(zip(schema, row))
 
             # TODO validate that row isn't > 2MB. BQ enforces a hard row size 
of 2MB.
             json.dump(row_dict, tmp_file_handle)
@@ -145,7 +139,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             # Stop if the file exceeds the file size limit.
             if tmp_file_handle.tell() >= self.approx_max_file_size_bytes:
                 file_no += 1
-                tmp_file_handle = NamedTemporaryFile(mode='w', delete=True)
+                tmp_file_handle = NamedTemporaryFile(delete=True)
                 tmp_file_handles[self.filename.format(file_no)] = 
tmp_file_handle
 
         return tmp_file_handles
@@ -160,12 +154,10 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             contains the BigQuery schema fields in .json format.
         """
         schema = []
-        for field, flag in zip(cursor.description, cursor.description_flags):
+        for field in cursor.description:
             # See PEP 249 for details about the description tuple.
             field_name = field[0]
-
-            field_type = self.type_map(field[1], flag)
-
+            field_type = self.type_map(field[1])
             # Always allow TIMESTAMP to be nullable. MySQLdb returns None types
             # for required fields because some MySQL timestamps can't be
             # represented by Python's datetime (e.g. 0000-00-00 00:00:00).
@@ -177,7 +169,7 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             })
 
         self.log.info('Using schema for %s: %s', self.schema_filename, schema)
-        tmp_schema_file_handle = NamedTemporaryFile(mode='w', delete=True)
+        tmp_schema_file_handle = NamedTemporaryFile(delete=True)
         json.dump(schema, tmp_schema_file_handle)
         return {self.schema_filename: tmp_schema_file_handle}
 
@@ -192,24 +184,21 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             hook.upload(self.bucket, object, tmp_file_handle.name, 
'application/json')
 
     @classmethod
-    def convert_types(cls, value, is_binary=False):
+    def convert_types(cls, value):
         """
         Takes a value from MySQLdb, and converts it to a value that's safe for
         JSON/Google cloud storage/BigQuery. Dates are converted to UTC seconds.
-        Decimals are converted to floats. Binaries are converted to 
base64-encoded
-        strings.
+        Decimals are converted to floats.
         """
         if type(value) in (datetime, date):
             return time.mktime(value.timetuple())
         elif isinstance(value, Decimal):
             return float(value)
-        elif is_binary:
-            return base64.b64encode(value).decode()
         else:
             return value
 
     @classmethod
-    def type_map(cls, mysql_type, flags):
+    def type_map(cls, mysql_type):
         """
         Helper function that maps from MySQL fields to BigQuery fields. Used
         when a schema_filename is set.
@@ -231,15 +220,4 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator):
             FIELD_TYPE.TIMESTAMP: 'TIMESTAMP',
             FIELD_TYPE.YEAR: 'INTEGER',
         }
-
-        if MySqlToGoogleCloudStorageOperator.is_binary(mysql_type, flags):
-            return 'BYTES'
-
         return d[mysql_type] if mysql_type in d else 'STRING'
-
-    @classmethod
-    def is_binary(cls, mysql_type, flags):
-        # MySQLdb groups both char/varchar and binary/varbinary as 
STRING/VAR_STRING.
-        # To work around this ambiguity, check the description flag to see if 
it's a binary field.
-        return mysql_type in [FIELD_TYPE.STRING, FIELD_TYPE.VAR_STRING] and \
-            flags & FLAG.BINARY == FLAG.BINARY

Reply via email to