Repository: incubator-airflow
Updated Branches:
  refs/heads/master 8e5805399 -> 52c745da7


[AIRFLOW-2596] Add Oracle to Azure Datalake Transfer Operator

Closes #3613 from
marcusrehm/oracle_to_azure_datalake_transfer


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/52c745da
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/52c745da
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/52c745da

Branch: refs/heads/master
Commit: 52c745da71a6da798f7322956967b5e818b56e48
Parents: 8e58053
Author: Marcus Rehm <[email protected]>
Authored: Fri Jul 20 22:46:59 2018 +0200
Committer: Fokko Driesprong <[email protected]>
Committed: Fri Jul 20 22:46:59 2018 +0200

----------------------------------------------------------------------
 .../oracle_to_azure_data_lake_transfer.py       | 113 ++++++++++++++++
 docs/code.rst                                   |   1 +
 .../test_oracle_to_azure_data_lake_transfer.py  | 135 +++++++++++++++++++
 3 files changed, 249 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/52c745da/airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py 
b/airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py
new file mode 100644
index 0000000..06a3998
--- /dev/null
+++ b/airflow/contrib/operators/oracle_to_azure_data_lake_transfer.py
@@ -0,0 +1,113 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+from airflow.hooks.oracle_hook import OracleHook
+from airflow.contrib.hooks.azure_data_lake_hook import AzureDataLakeHook
+from airflow.models import BaseOperator
+from airflow.utils.decorators import apply_defaults
+from airflow.utils.file import TemporaryDirectory
+
+import unicodecsv as csv
+import os
+
+
+class OracleToAzureDataLakeTransfer(BaseOperator):
+    """
+    Moves data from Oracle to Azure Data Lake. The operator runs the query 
against
+    Oracle and stores the file locally before loading it into Azure Data Lake.
+
+
+    :param filename: file name to be used by the csv file.
+    :type filename: str
+    :param azure_data_lake_conn_id: destination azure data lake connection.
+    :type azure_data_lake_conn_id: str
+    :param azure_data_lake_path: destination path in azure data lake to put 
the file.
+    :type azure_data_lake_path: str
+    :param oracle_conn_id: source Oracle connection.
+    :type oracle_conn_id: str
+    :param sql: SQL query to execute against the Oracle database. (templated)
+    :type sql: str
+    :param sql_params: Parameters to use in sql query. (templated)
+    :type sql_params: str
+    :param delimiter: field delimiter in the file.
+    :type delimiter: str
+    :param encoding: enconding type for the file.
+    :type encoding: str
+    :param quotechar: Character to use in quoting.
+    :type quotechar: str
+    :param quoting: Quoting strategy. See unicodecsv quoting for more 
information.
+    :type quoting: str
+    """
+
+    template_fields = ('filename', 'sql', 'sql_params')
+    ui_color = '#e08c8c'
+
+    @apply_defaults
+    def __init__(
+            self,
+            filename,
+            azure_data_lake_conn_id,
+            azure_data_lake_path,
+            oracle_conn_id,
+            sql,
+            sql_params={},
+            delimiter=",",
+            encoding="utf-8",
+            quotechar='"',
+            quoting=csv.QUOTE_MINIMAL,
+            *args, **kwargs):
+        super(OracleToAzureDataLakeTransfer, self).__init__(*args, **kwargs)
+        self.filename = filename
+        self.oracle_conn_id = oracle_conn_id
+        self.sql = sql
+        self.sql_params = sql_params
+        self.azure_data_lake_conn_id = azure_data_lake_conn_id
+        self.azure_data_lake_path = azure_data_lake_path
+        self.delimiter = delimiter
+        self.encoding = encoding
+        self.quotechar = quotechar
+        self.quoting = quoting
+
+    def _write_temp_file(self, cursor, path_to_save):
+        with open(path_to_save, 'wb') as csvfile:
+            csv_writer = csv.writer(csvfile, delimiter=self.delimiter,
+                                    encoding=self.encoding, 
quotechar=self.quotechar,
+                                    quoting=self.quoting)
+            csv_writer.writerow(map(lambda field: field[0], 
cursor.description))
+            csv_writer.writerows(cursor)
+            csvfile.flush()
+
+    def execute(self, context):
+        oracle_hook = OracleHook(oracle_conn_id=self.oracle_conn_id)
+        azure_data_lake_hook = AzureDataLakeHook(
+            azure_data_lake_conn_id=self.azure_data_lake_conn_id)
+
+        self.log.info("Dumping Oracle query results to local file")
+        conn = oracle_hook.get_conn()
+        cursor = conn.cursor()
+        cursor.execute(self.sql, self.sql_params)
+
+        with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp:
+            self._write_temp_file(cursor, os.path.join(temp, self.filename))
+            self.log.info("Uploading local file to Azure Data Lake")
+            azure_data_lake_hook.upload_file(os.path.join(temp, self.filename),
+                                             
os.path.join(self.azure_data_lake_path,
+                                                          self.filename))
+        cursor.close()
+        conn.close()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/52c745da/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index 21b5b1d..4f1b301 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -171,6 +171,7 @@ Operators
 .. autoclass:: 
airflow.contrib.operators.mlengine_operator.MLEngineTrainingOperator
 .. autoclass:: airflow.contrib.operators.mongo_to_s3.MongoToS3Operator
 .. autoclass:: 
airflow.contrib.operators.mysql_to_gcs.MySqlToGoogleCloudStorageOperator
+.. autoclass:: 
airflow.contrib.operators.oracle_to_azure_data_lake_transfer.OracleToAzureDataLakeTransfer
 .. autoclass:: 
airflow.contrib.operators.postgres_to_gcs_operator.PostgresToGoogleCloudStorageOperator
 .. autoclass:: 
airflow.contrib.operators.pubsub_operator.PubSubTopicCreateOperator
 .. autoclass:: 
airflow.contrib.operators.pubsub_operator.PubSubTopicDeleteOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/52c745da/tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py 
b/tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py
new file mode 100644
index 0000000..e02e631
--- /dev/null
+++ b/tests/contrib/operators/test_oracle_to_azure_data_lake_transfer.py
@@ -0,0 +1,135 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+from airflow.contrib.operators.oracle_to_azure_data_lake_transfer \
+    import OracleToAzureDataLakeTransfer
+from airflow.utils.file import TemporaryDirectory
+import unicodecsv as csv
+import os
+
+try:
+    from unittest import mock
+    from unittest.mock import MagicMock
+except ImportError:
+    try:
+        import mock
+        from mock import MagicMock
+    except ImportError:
+        mock = None
+
+
+class OracleToAzureDataLakeTransferTest(unittest.TestCase):
+
+    mock_module_path = 
'airflow.contrib.operators.oracle_to_azure_data_lake_transfer'
+
+    def test_write_temp_file(self):
+        task_id = "some_test_id"
+        sql = "some_sql"
+        sql_params = {':p_data': "2018-01-01"}
+        oracle_conn_id = "oracle_conn_id"
+        filename = "some_filename"
+        azure_data_lake_conn_id = 'azure_data_lake_conn_id'
+        azure_data_lake_path = 'azure_data_lake_path'
+        delimiter = '|'
+        encoding = 'utf-8'
+        cursor_description = [
+            ('id', "<class 'cx_Oracle.NUMBER'>", 39, None, 38, 0, 0),
+            ('description', "<class 'cx_Oracle.STRING'>", 60, 240, None, None, 
1)
+        ]
+        cursor_rows = [[1, 'description 1'], [2, 'description 2']]
+        mock_cursor = MagicMock()
+        mock_cursor.description = cursor_description
+        mock_cursor.__iter__.return_value = cursor_rows
+
+        op = OracleToAzureDataLakeTransfer(
+            task_id=task_id,
+            filename=filename,
+            oracle_conn_id=oracle_conn_id,
+            sql=sql,
+            sql_params=sql_params,
+            azure_data_lake_conn_id=azure_data_lake_conn_id,
+            azure_data_lake_path=azure_data_lake_path,
+            delimiter=delimiter,
+            encoding=encoding)
+
+        with TemporaryDirectory(prefix='airflow_oracle_to_azure_op_') as temp:
+            op._write_temp_file(mock_cursor, os.path.join(temp, filename))
+
+            assert os.path.exists(os.path.join(temp, filename)) == 1
+
+            with open(os.path.join(temp, filename), 'rb') as csvfile:
+                temp_file = csv.reader(csvfile, delimiter=delimiter, 
encoding=encoding)
+
+                rownum = 0
+                for row in temp_file:
+                    if rownum == 0:
+                        self.assertEqual(row[0], 'id')
+                        self.assertEqual(row[1], 'description')
+                    else:
+                        self.assertEqual(row[0], str(cursor_rows[rownum - 
1][0]))
+                        self.assertEqual(row[1], cursor_rows[rownum - 1][1])
+                    rownum = rownum + 1
+
+    @mock.patch(mock_module_path + '.OracleHook',
+                autospec=True)
+    @mock.patch(mock_module_path + '.AzureDataLakeHook',
+                autospec=True)
+    def test_execute(self, mock_data_lake_hook, mock_oracle_hook):
+        task_id = "some_test_id"
+        sql = "some_sql"
+        sql_params = {':p_data': "2018-01-01"}
+        oracle_conn_id = "oracle_conn_id"
+        filename = "some_filename"
+        azure_data_lake_conn_id = 'azure_data_lake_conn_id'
+        azure_data_lake_path = 'azure_data_lake_path'
+        delimiter = '|'
+        encoding = 'latin-1'
+        cursor_description = [
+            ('id', "<class 'cx_Oracle.NUMBER'>", 39, None, 38, 0, 0),
+            ('description', "<class 'cx_Oracle.STRING'>", 60, 240, None, None, 
1)
+        ]
+        cursor_rows = [[1, 'description 1'], [2, 'description 2']]
+        cursor_mock = MagicMock()
+        cursor_mock.description.return_value = cursor_description
+        cursor_mock.__iter__.return_value = cursor_rows
+        mock_oracle_conn = MagicMock()
+        mock_oracle_conn.cursor().return_value = cursor_mock
+        mock_oracle_hook.get_conn().return_value = mock_oracle_conn
+
+        op = OracleToAzureDataLakeTransfer(
+            task_id=task_id,
+            filename=filename,
+            oracle_conn_id=oracle_conn_id,
+            sql=sql,
+            sql_params=sql_params,
+            azure_data_lake_conn_id=azure_data_lake_conn_id,
+            azure_data_lake_path=azure_data_lake_path,
+            delimiter=delimiter,
+            encoding=encoding)
+
+        op.execute(None)
+
+        mock_oracle_hook.assert_called_once_with(oracle_conn_id=oracle_conn_id)
+        mock_data_lake_hook.assert_called_once_with(
+            azure_data_lake_conn_id=azure_data_lake_conn_id)
+
+
+if __name__ == '__main__':
+    unittest.main()

Reply via email to