Repository: incubator-airflow Updated Branches: refs/heads/master 8e5805399 -> 52c745da7
[AIRFLOW-2596] Add Oracle to Azure Datalake Transfer Operator Closes #3613 from marcusrehm/oracle_to_azure_datalake_transfer Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/52c745da Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/52c745da Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/52c745da Branch: refs/heads/master Commit: 52c745da71a6da798f7322956967b5e818b56e48 Parents: 8e58053 Author: Marcus Rehm <[email protected]> Authored: Fri Jul 20 22:46:59 2018 +0200 Committer: Fokko Driesprong <[email protected]> Committed: Fri Jul 20 22:46:59 2018 +0200 ---------------------------------------------------------------------- .../oracle_to_azure_data_lake_transfer.py | 113 ++++++++++++++++ docs/code.rst | 1 + .../test_oracle_to_azure_data_lake_transfer.py | 135 +++++++++++++++++++ 3 files changed, 249 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/52c745da/airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py b/airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py new file mode 100644 index 0000000..06a3998 --- /dev/null +++ b/airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py @@ -0,0 +1,113 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from airflow.hooks.oracle_hook import OracleHook +from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook +from airflow.models import BaseOperator +from airflow.utils.decorators import apply_defaults +from airflow.utils.file import TemporaryDirectory + +import unicodecsv as csv +import os + + +class OracleToAzureDataLakeTransfer(BaseOperator): + """ + Moves data from Oracle to Azure Data Lake. The operator runs the query against + Oracle and stores the file locally before loading it into Azure Data Lake. + + + :param filename: file name to be used by the csv file. + :type filename: str + :param azure_data_lake_conn_id: destination azure data lake connection. + :type azure_data_lake_conn_id: str + :param azure_data_lake_path: destination path in azure data lake to put the file. + :type azure_data_lake_path: str + :param oracle_conn_id: source Oracle connection. + :type oracle_conn_id: str + :param sql: SQL query to execute against the Oracle database. (templated) + :type sql: str + :param sql_params: Parameters to use in sql query. (templated) + :type sql_params: str + :param delimiter: field delimiter in the file. + :type delimiter: str + :param encoding: enconding type for the file. + :type encoding: str + :param quotechar: Character to use in quoting. + :type quotechar: str + :param quoting: Quoting strategy. See unicodecsv quoting for more information. + :type quoting: str + """ + + template_fields = ('filename', 'sql', 'sql_params') + ui_color = '#e08c8c' + + @apply_defaults + def __init__( + self, + filename, + azure_data_lake_conn_id, + azure_data_lake_path, + oracle_conn_id, + sql, + sql_params={}, + delimiter=",", + encoding="utf-8", + quotechar='"', + quoting=csv.QUOTE_MINIMAL, + *args, **kwargs): + super(OracleToAzureDataLakeTransfer, self).__init__(*args, **kwargs) + self.filename = filename + self.oracle_conn_id = oracle_conn_id + self.sql = sql + self.sql_params = sql_params + self.azure_data_lake_conn_id = azure_data_lake_conn_id + self.azure_data_lake_path = azure_data_lake_path + self.delimiter = delimiter + self.encoding = encoding + self.quotechar = quotechar + self.quoting = quoting + + def _write_temp_file(self, cursor, path_to_save): + with open(path_to_save, 'wb') as csvfile: + csv_writer = csv.writer(csvfile, delimiter=self.delimiter, + encoding=self.encoding, quotechar=self.quotechar, + quoting=self.quoting) + csv_writer.writerow(map(lambda field: field[0], cursor.description)) + csv_writer.writerows(cursor) + csvfile.flush() + + def execute(self, context): + oracle_hook = OracleHook(oracle_conn_id=self.oracle_conn_id) + azure_data_lake_hook = AzureDataLakeHook( + azure_data_lake_conn_id=self.azure_data_lake_conn_id) + + self.log.info("Dumping Oracle query results to local file") + conn = oracle_hook.get_conn() + cursor = conn.cursor() + cursor.execute(self.sql, self.sql_params) + + with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp: + self._write_temp_file(cursor, os.path.join(temp, self.filename)) + self.log.info("Uploading local file to Azure Data Lake") + azure_data_lake_hook.upload_file(os.path.join(temp, self.filename), + os.path.join(self.azure_data_lake_path, + self.filename)) + cursor.close() + conn.close() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/52c745da/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index 21b5b1d..4f1b301 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -171,6 +171,7 @@ Operators .. autoclass:: airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperator .. autoclass:: airflow.contrib.operators.mongo_to_s3.MongoToS3Operator .. autoclass:: airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator +.. autoclass:: airflow.contrib.operators.oracle_to_azure_data_lake_transfer.OracleToAzureDataLakeTransfer .. autoclass:: airflow.contrib.operators.postgres_to_gcs_operator.PostgresToGoogleCloudStorageOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator .. autoclass:: airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/52c745da/tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py b/tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py new file mode 100644 index 0000000..e02e631 --- /dev/null +++ b/tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py @@ -0,0 +1,135 @@ +# -*- coding: utf-8 -*- +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +import unittest +from airflow.contrib.operators.oracle_to_azure_data_lake_transfer \ + import OracleToAzureDataLakeTransfer +from airflow.utils.file import TemporaryDirectory +import unicodecsv as csv +import os + +try: + from unittest import mock + from unittest.mock import MagicMock +except ImportError: + try: + import mock + from mock import MagicMock + except ImportError: + mock = None + + +class OracleToAzureDataLakeTransferTest(unittest.TestCase): + + mock_module_path = 'airflow.contrib.operators.oracle_to_azure_data_lake_transfer' + + def test_write_temp_file(self): + task_id = "some_test_id" + sql = "some_sql" + sql_params = {':p_data': "2018-01-01"} + oracle_conn_id = "oracle_conn_id" + filename = "some_filename" + azure_data_lake_conn_id = 'azure_data_lake_conn_id' + azure_data_lake_path = 'azure_data_lake_path' + delimiter = '|' + encoding = 'utf-8' + cursor_description = [ + ('id', "<class 'cx_Oracle.NUMBER'>", 39, None, 38, 0, 0), + ('description', "<class 'cx_Oracle.STRING'>", 60, 240, None, None, 1) + ] + cursor_rows = [[1, 'description 1'], [2, 'description 2']] + mock_cursor = MagicMock() + mock_cursor.description = cursor_description + mock_cursor.__iter__.return_value = cursor_rows + + op = OracleToAzureDataLakeTransfer( + task_id=task_id, + filename=filename, + oracle_conn_id=oracle_conn_id, + sql=sql, + sql_params=sql_params, + azure_data_lake_conn_id=azure_data_lake_conn_id, + azure_data_lake_path=azure_data_lake_path, + delimiter=delimiter, + encoding=encoding) + + with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp: + op._write_temp_file(mock_cursor, os.path.join(temp, filename)) + + assert os.path.exists(os.path.join(temp, filename)) == 1 + + with open(os.path.join(temp, filename), 'rb') as csvfile: + temp_file = csv.reader(csvfile, delimiter=delimiter, encoding=encoding) + + rownum = 0 + for row in temp_file: + if rownum == 0: + self.assertEqual(row[0], 'id') + self.assertEqual(row[1], 'description') + else: + self.assertEqual(row[0], str(cursor_rows[rownum - 1][0])) + self.assertEqual(row[1], cursor_rows[rownum - 1][1]) + rownum = rownum + 1 + + @mock.patch(mock_module_path + '.OracleHook', + autospec=True) + @mock.patch(mock_module_path + '.AzureDataLakeHook', + autospec=True) + def test_execute(self, mock_data_lake_hook, mock_oracle_hook): + task_id = "some_test_id" + sql = "some_sql" + sql_params = {':p_data': "2018-01-01"} + oracle_conn_id = "oracle_conn_id" + filename = "some_filename" + azure_data_lake_conn_id = 'azure_data_lake_conn_id' + azure_data_lake_path = 'azure_data_lake_path' + delimiter = '|' + encoding = 'latin-1' + cursor_description = [ + ('id', "<class 'cx_Oracle.NUMBER'>", 39, None, 38, 0, 0), + ('description', "<class 'cx_Oracle.STRING'>", 60, 240, None, None, 1) + ] + cursor_rows = [[1, 'description 1'], [2, 'description 2']] + cursor_mock = MagicMock() + cursor_mock.description.return_value = cursor_description + cursor_mock.__iter__.return_value = cursor_rows + mock_oracle_conn = MagicMock() + mock_oracle_conn.cursor().return_value = cursor_mock + mock_oracle_hook.get_conn().return_value = mock_oracle_conn + + op = OracleToAzureDataLakeTransfer( + task_id=task_id, + filename=filename, + oracle_conn_id=oracle_conn_id, + sql=sql, + sql_params=sql_params, + azure_data_lake_conn_id=azure_data_lake_conn_id, + azure_data_lake_path=azure_data_lake_path, + delimiter=delimiter, + encoding=encoding) + + op.execute(None) + + mock_oracle_hook.assert_called_once_with(oracle_conn_id=oracle_conn_id) + mock_data_lake_hook.assert_called_once_with( + azure_data_lake_conn_id=azure_data_lake_conn_id) + + +if __name__ == '__main__': + unittest.main()
