[AIRFLOW-868] Add postgres_to_gcs operator and unittests Adds a postgres_to_gcs operator to contrib so that a user can copy a dump from postgres to google cloud storage. Tests write to local NamedTemporayFiles so we correctly test serializing encoded ndjson in both python3 and python2.7.
Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/d8fa2e90 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/d8fa2e90 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/d8fa2e90 Branch: refs/heads/master Commit: d8fa2e9049328341dc58a635b34f04fa52de543e Parents: 2f79610 Author: Adam Boscarino <[email protected]> Authored: Sun Feb 12 18:27:15 2017 -0500 Committer: Devon Peticolas <[email protected]> Committed: Wed Nov 15 14:34:59 2017 -0500 ---------------------------------------------------------------------- .../operators/postgres_to_gcs_operator.py | 242 +++++++++++++++++++ .../operators/test_postgres_to_gcs_operator.py | 153 ++++++++++++ 2 files changed, 395 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8fa2e90/airflow/contrib/operators/postgres_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/postgres_to_gcs_operator.py b/airflow/contrib/operators/postgres_to_gcs_operator.py new file mode 100644 index 0000000..441ccf5 --- /dev/null +++ b/airflow/contrib/operators/postgres_to_gcs_operator.py @@ -0,0 +1,242 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import sys +import json +import logging +import time +import datetime + +from airflow.contrib.hooks.gcs_hook import GoogleCloudStorageHook +from airflow.hooks.postgres_hook import PostgresHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from decimal import Decimal +from tempfile import NamedTemporaryFile + +PY3 = sys.version_info[0] == 3 + + +class PostgresToGoogleCloudStorageOperator(BaseOperator): + """ + Copy data from Postgres to Google Cloud Storage in JSON format. + """ + template_fields = ('sql', 'bucket', 'filename', 'schema_filename', + 'parameters') + template_ext = ('.sql', ) + ui_color = '#a0e08c' + + @apply_defaults + def __init__(self, + sql, + bucket, + filename, + schema_filename=None, + approx_max_file_size_bytes=1900000000, + postgres_conn_id='postgres_default', + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + parameters=None, + *args, + **kwargs): + """ + :param sql: The SQL to execute on the Postgres table. + :type sql: string + :param bucket: The bucket to upload to. + :type bucket: string + :param filename: The filename to use as the object name when uploading + to Google Cloud Storage. A {} should be specified in the filename + to allow the operator to inject file numbers in cases where the + file is split due to size. + :type filename: string + :param schema_filename: If set, the filename to use as the object name + when uploading a .json file containing the BigQuery schema fields + for the table that was dumped from Postgres. + :type schema_filename: string + :param approx_max_file_size_bytes: This operator supports the ability + to split large table dumps into multiple files (see notes in the + filenamed param docs above). Google Cloud Storage allows for files + to be a maximum of 4GB. This param allows developers to specify the + file size of the splits. + :type approx_max_file_size_bytes: long + :param postgres_conn_id: Reference to a specific Postgres hook. + :type postgres_conn_id: string + :param google_cloud_storage_conn_id: Reference to a specific Google + cloud storage hook. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. For this to + work, the service account making the request must have domain-wide + delegation enabled. + :param parameters: a parameters dict that is substituted at query runtime. + :type parameters: dict + """ + super(PostgresToGoogleCloudStorageOperator, self).__init__(*args, **kwargs) + self.sql = sql + self.bucket = bucket + self.filename = filename + self.schema_filename = schema_filename + self.approx_max_file_size_bytes = approx_max_file_size_bytes + self.postgres_conn_id = postgres_conn_id + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.delegate_to = delegate_to + self.parameters = parameters + + def execute(self, context): + cursor = self._query_postgres() + files_to_upload = self._write_local_data_files(cursor) + + # If a schema is set, create a BQ schema JSON file. + if self.schema_filename: + files_to_upload.update(self._write_local_schema_file(cursor)) + + # Flush all files before uploading + for file_handle in files_to_upload.values(): + file_handle.flush() + + self._upload_to_gcs(files_to_upload) + + # Close all temp file handles. + for file_handle in files_to_upload.values(): + file_handle.close() + + def _query_postgres(self): + """ + Queries Postgres and returns a cursor to the results. + """ + postgres = PostgresHook(postgres_conn_id=self.postgres_conn_id) + conn = postgres.get_conn() + cursor = conn.cursor() + cursor.execute(self.sql, self.parameters) + return cursor + + def _write_local_data_files(self, cursor): + """ + Takes a cursor, and writes results to a local file. + + :return: A dictionary where keys are filenames to be used as object + names in GCS, and values are file handles to local files that + contain the data for the GCS objects. + """ + schema = list(map(lambda schema_tuple: schema_tuple[0], cursor.description)) + file_no = 0 + tmp_file_handle = NamedTemporaryFile(delete=True) + tmp_file_handles = {self.filename.format(file_no): tmp_file_handle} + + for row in cursor: + # Convert datetime objects to utc seconds, and decimals to floats + row = map(self.convert_types, row) + row_dict = dict(zip(schema, row)) + + s = json.dumps(row_dict, sort_keys=True) + if PY3: + s = s.encode('utf-8') + tmp_file_handle.write(s) + + # Append newline to make dumps BigQuery compatible. + tmp_file_handle.write(b'\n') + + # Stop if the file exceeds the file size limit. + if tmp_file_handle.tell() >= self.approx_max_file_size_bytes: + file_no += 1 + tmp_file_handle = NamedTemporaryFile(delete=True) + tmp_file_handles[self.filename.format(file_no)] = tmp_file_handle + + return tmp_file_handles + + def _write_local_schema_file(self, cursor): + """ + Takes a cursor, and writes the BigQuery schema for the results to a + local file system. + + :return: A dictionary where key is a filename to be used as an object + name in GCS, and values are file handles to local files that + contains the BigQuery schema fields in .json format. + """ + schema = [] + for field in cursor.description: + # See PEP 249 for details about the description tuple. + field_name = field[0] + field_type = self.type_map(field[1]) + field_mode = 'REPEATED' if field[1] in (1009, 1005, 1007, + 1016) else 'NULLABLE' + schema.append({ + 'name': field_name, + 'type': field_type, + 'mode': field_mode, + }) + + logging.info('Using schema for %s: %s', self.schema_filename, schema) + tmp_schema_file_handle = NamedTemporaryFile(delete=True) + s = json.dumps(schema, sort_keys=True) + if PY3: + s = s.encode('utf-8') + tmp_schema_file_handle.write(s) + return {self.schema_filename: tmp_schema_file_handle} + + def _upload_to_gcs(self, files_to_upload): + """ + Upload all of the file splits (and optionally the schema .json file) to + Google Cloud Storage. + """ + hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + for object, tmp_file_handle in files_to_upload.items(): + hook.upload(self.bucket, object, tmp_file_handle.name, + 'application/json') + + @classmethod + def convert_types(cls, value): + """ + Takes a value from Postgres, and converts it to a value that's safe for + JSON/Google Cloud Storage/BigQuery. Dates are converted to UTC seconds. + Decimals are converted to floats. Times are converted to seconds. + """ + if type(value) in (datetime.datetime, datetime.date): + return time.mktime(value.timetuple()) + elif type(value) == datetime.time: + formated_time = time.strptime(str(value), "%H:%M:%S") + return datetime.timedelta( + hours=formated_time.tm_hour, + minutes=formated_time.tm_min, + seconds=formated_time.tm_sec).seconds + elif isinstance(value, Decimal): + return float(value) + else: + return value + + @classmethod + def type_map(cls, postgres_type): + """ + Helper function that maps from Postgres fields to BigQuery fields. Used + when a schema_filename is set. + """ + d = { + 1114: 'TIMESTAMP', + 1184: 'TIMESTAMP', + 1082: 'TIMESTAMP', + 1083: 'TIMESTAMP', + 1005: 'INTEGER', + 1007: 'INTEGER', + 1016: 'INTEGER', + 20: 'INTEGER', + 21: 'INTEGER', + 23: 'INTEGER', + 16: 'BOOLEAN', + 700: 'FLOAT', + 701: 'FLOAT', + 1700: 'FLOAT' + } + + return d[postgres_type] if postgres_type in d else 'STRING' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/d8fa2e90/tests/contrib/operators/test_postgres_to_gcs_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_postgres_to_gcs_operator.py b/tests/contrib/operators/test_postgres_to_gcs_operator.py new file mode 100644 index 0000000..9567243 --- /dev/null +++ b/tests/contrib/operators/test_postgres_to_gcs_operator.py @@ -0,0 +1,153 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +from __future__ import absolute_import +from __future__ import division +from __future__ import print_function +from __future__ import unicode_literals + +import sys +import unittest + +from airflow.contrib.operators.postgres_to_gcs_operator import PostgresToGoogleCloudStorageOperator + +try: + from unittest import mock +except ImportError: + try: + import mock + except ImportError: + mock = None + +PY3 = sys.version_info[0] == 3 + +TASK_ID = 'test-postgres-to-gcs' +POSTGRES_CONN_ID = 'postgres_conn_test' +SQL = 'select 1' +BUCKET = 'gs://test' +FILENAME = 'test_{}.ndjson' +# we expect the psycopg cursor to return encoded strs in py2 and decoded in py3 +if PY3: + ROWS = [('mock_row_content_1', 42), ('mock_row_content_2', 43), ('mock_row_content_3', 44)] + CURSOR_DESCRIPTION = (('some_str', 0), ('some_num', 1005)) +else: + ROWS = [(b'mock_row_content_1', 42), (b'mock_row_content_2', 43), (b'mock_row_content_3', 44)] + CURSOR_DESCRIPTION = ((b'some_str', 0), (b'some_num', 1005)) +NDJSON_LINES = [ + b'{"some_num": 42, "some_str": "mock_row_content_1"}\n', + b'{"some_num": 43, "some_str": "mock_row_content_2"}\n', + b'{"some_num": 44, "some_str": "mock_row_content_3"}\n' +] +SCHEMA_FILENAME = 'schema_test.json' +SCHEMA_JSON = b'[{"mode": "NULLABLE", "name": "some_str", "type": "STRING"}, {"mode": "REPEATED", "name": "some_num", "type": "INTEGER"}]' + + +class PostgresToGoogleCloudStorageOperatorTest(unittest.TestCase): + def test_init(self): + """Test PostgresToGoogleCloudStorageOperator instance is properly initialized.""" + op = PostgresToGoogleCloudStorageOperator( + task_id=TASK_ID, sql=SQL, bucket=BUCKET, filename=FILENAME) + self.assertEqual(op.task_id, TASK_ID) + self.assertEqual(op.sql, SQL) + self.assertEqual(op.bucket, BUCKET) + self.assertEqual(op.filename, FILENAME) + + @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook') + @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook') + def test_exec_success(self, gcs_hook_mock_class, pg_hook_mock_class): + """Test the execute function in case where the run is successful.""" + op = PostgresToGoogleCloudStorageOperator( + task_id=TASK_ID, + postgres_conn_id=POSTGRES_CONN_ID, + sql=SQL, + bucket=BUCKET, + filename=FILENAME) + + pg_hook_mock = pg_hook_mock_class.return_value + pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS) + pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION + + gcs_hook_mock = gcs_hook_mock_class.return_value + + def _assert_upload(bucket, obj, tmp_filename, content_type): + self.assertEqual(BUCKET, bucket) + self.assertEqual(FILENAME.format(0), obj) + self.assertEqual('application/json', content_type) + with open(tmp_filename, 'rb') as f: + self.assertEqual(b''.join(NDJSON_LINES), f.read()) + + gcs_hook_mock.upload.side_effect = _assert_upload + + op.execute(None) + + pg_hook_mock_class.assert_called_once_with(postgres_conn_id=POSTGRES_CONN_ID) + pg_hook_mock.get_conn().cursor().execute.assert_called_once_with(SQL, None) + + @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook') + @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook') + def test_file_splitting(self, gcs_hook_mock_class, pg_hook_mock_class): + """Test that ndjson is split by approx_max_file_size_bytes param.""" + pg_hook_mock = pg_hook_mock_class.return_value + pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS) + pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION + + gcs_hook_mock = gcs_hook_mock_class.return_value + expected_upload = { + FILENAME.format(0): b''.join(NDJSON_LINES[:2]), + FILENAME.format(1): NDJSON_LINES[2], + } + + def _assert_upload(bucket, obj, tmp_filename, content_type): + self.assertEqual(BUCKET, bucket) + self.assertEqual('application/json', content_type) + with open(tmp_filename, 'rb') as f: + self.assertEqual(expected_upload[obj], f.read()) + + gcs_hook_mock.upload.side_effect = _assert_upload + + op = PostgresToGoogleCloudStorageOperator( + task_id=TASK_ID, + sql=SQL, + bucket=BUCKET, + filename=FILENAME, + approx_max_file_size_bytes=len(expected_upload[FILENAME.format(0)])) + op.execute(None) + + @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.PostgresHook') + @mock.patch('airflow.contrib.operators.postgres_to_gcs_operator.GoogleCloudStorageHook') + def test_schema_file(self, gcs_hook_mock_class, pg_hook_mock_class): + """Test writing schema files.""" + pg_hook_mock = pg_hook_mock_class.return_value + pg_hook_mock.get_conn().cursor().__iter__.return_value = iter(ROWS) + pg_hook_mock.get_conn().cursor().description = CURSOR_DESCRIPTION + + gcs_hook_mock = gcs_hook_mock_class.return_value + + def _assert_upload(bucket, obj, tmp_filename, content_type): + if obj == SCHEMA_FILENAME: + with open(tmp_filename, 'rb') as f: + self.assertEqual(SCHEMA_JSON, f.read()) + + gcs_hook_mock.upload.side_effect = _assert_upload + + op = PostgresToGoogleCloudStorageOperator( + task_id=TASK_ID, + sql=SQL, + bucket=BUCKET, + filename=FILENAME, + schema_filename=SCHEMA_FILENAME) + op.execute(None) + + # once for the file and once for the schema + self.assertEqual(2, gcs_hook_mock.upload.call_count)
