Repository: incubator-airflow Updated Branches: refs/heads/master 59aba3064 -> eff68882b
[AIRFLOW-1613] make mysql_to_gcs_operator py3 compatible Uses `__future__.unicode_literals` and replaces calling `json.dumps` with `json.dump` followed by `tmp_file_handle.write` to write json lines to the ndjson file. When using python3, `json.dump` will return a unicode string instead of a byte string, therefore we encode the unicode string to `utf-8` which is compatible with bigquery (see: https://cloud.google.com/bigquery/docs/loading-data#loading_encoded_data). Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/2f79610a Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/2f79610a Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/2f79610a Branch: refs/heads/master Commit: 2f79610a3ef726e88dec238de000d9295ae7d2a9 Parents: 313f5ba Author: Devon Peticolas <[email protected]> Authored: Mon Nov 13 18:12:34 2017 -0500 Committer: Devon Peticolas <[email protected]> Committed: Wed Nov 15 14:00:02 2017 -0500 ---------------------------------------------------------------------- airflow/contrib/operators/mysql_to_gcs.py | 20 +++++++++++++++----- 1 file changed, 15 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/2f79610a/airflow/contrib/operators/mysql_to_gcs.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/mysql_to_gcs.py b/airflow/contrib/operators/mysql_to_gcs.py index f94bc24..784481d 100644 --- a/airflow/contrib/operators/mysql_to_gcs.py +++ b/airflow/contrib/operators/mysql_to_gcs.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import sys import json import time @@ -24,6 +25,8 @@ from decimal import Decimal from MySQLdb.constants import FIELD_TYPE from tempfile import NamedTemporaryFile +PY3 = sys.version_info[0] == 3 + class MySqlToGoogleCloudStorageOperator(BaseOperator): """ @@ -131,10 +134,13 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): row_dict = dict(zip(schema, row)) # TODO validate that row isn't > 2MB. BQ enforces a hard row size of 2MB. - json.dump(row_dict, tmp_file_handle) + s = json.dumps(row_dict) + if PY3: + s = s.encode('utf-8') + tmp_file_handle.write(s) # Append newline to make dumps BigQuery compatible. - tmp_file_handle.write('\n') + tmp_file_handle.write(b'\n') # Stop if the file exceeds the file size limit. if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: @@ -170,7 +176,10 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): self.log.info('Using schema for %s: %s', self.schema_filename, schema) tmp_schema_file_handle = NamedTemporaryFile(delete=True) - json.dump(schema, tmp_schema_file_handle) + s = json.dumps(schema, tmp_schema_file_handle) + if PY3: + s = s.encode('utf-8') + tmp_schema_file_handle.write(s) return {self.schema_filename: tmp_schema_file_handle} def _upload_to_gcs(self, files_to_upload): @@ -178,8 +187,9 @@ class MySqlToGoogleCloudStorageOperator(BaseOperator): Upload all of the file splits (and optionally the schema .json file) to Google cloud storage. """ - hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, - delegate_to=self.delegate_to) + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) for object, tmp_file_handle in files_to_upload.items(): hook.upload(self.bucket, object, tmp_file_handle.name, 'application/json')
