This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/master by this push:
     new f60940d  Add unit test for test_sql_to_gcs (#9920)
f60940d is described below

commit f60940d3ecdb32369f4b268a177a1aef21930a8c
Author: chipmyersjr <[email protected]>
AuthorDate: Tue Jul 21 23:48:31 2020 -0700

    Add unit test for test_sql_to_gcs (#9920)
---
 .../google/cloud/transfers/test_sql_to_gcs.py      | 151 +++++++++++++++++++++
 tests/test_project_structure.py                    |   1 -
 2 files changed, 151 insertions(+), 1 deletion(-)

diff --git a/tests/providers/google/cloud/transfers/test_sql_to_gcs.py 
b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
new file mode 100644
index 0000000..b55b6f8
--- /dev/null
+++ b/tests/providers/google/cloud/transfers/test_sql_to_gcs.py
@@ -0,0 +1,151 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import json
+import unittest
+from unittest.mock import Mock
+
+import mock
+import unicodecsv as csv
+
+from airflow.providers.google.cloud.hooks.gcs import GCSHook
+from airflow.providers.google.cloud.transfers.sql_to_gcs import 
BaseSQLToGCSOperator
+
+SQL = "SELECT * FROM test_table"
+BUCKET = "TEST-BUCKET-1"
+FILENAME = "test_results.csv"
+TASK_ID = "TEST_TASK_ID"
+SCHEMA = [{"name": "column_a", "type": "3"},
+          {"name": "column_b", "type": "253"},
+          {"name": "column_c", "type": "10"}]
+COLUMNS = ["column_a", "column_b", "column_c"]
+ROW = ["convert_type_return_value", "convert_type_return_value", 
"convert_type_return_value"]
+TMP_FILE_NAME = "temp-file"
+INPUT_DATA = [["101", "school", "2015-01-01"],
+              ["102", "business", "2017-05-24"],
+              ["103", "non-profit", "2018-10-01"]]
+OUTPUT_DATA = json.dumps({
+    "column_a": "convert_type_return_value",
+    "column_b": "convert_type_return_value",
+    "column_c": "convert_type_return_value"
+}).encode("utf-8")
+SCHEMA_FILE = "schema_file.json"
+APP_JSON = "application/json"
+
+
+class DummySQLToGCSOperator(BaseSQLToGCSOperator):
+
+    def field_to_bigquery(self, field):
+        pass
+
+    def convert_type(self, value, schema_type):
+        pass
+
+    def query(self):
+        pass
+
+
+class TestBaseSQLToGCSOperator(unittest.TestCase):
+
+    
@mock.patch("airflow.providers.google.cloud.transfers.sql_to_gcs.NamedTemporaryFile")
+    @mock.patch.object(csv.writer, "writerow")
+    @mock.patch.object(GCSHook, "upload")
+    @mock.patch.object(DummySQLToGCSOperator, "query")
+    @mock.patch.object(DummySQLToGCSOperator, "field_to_bigquery")
+    @mock.patch.object(DummySQLToGCSOperator, "convert_type")
+    def test_exec(self,
+                  mock_convert_type,
+                  mock_field_to_bigquery,
+                  mock_query,
+                  mock_upload,
+                  mock_writerow,
+                  mock_tempfile):
+        cursor_mock = Mock()
+        cursor_mock.description = [("column_a", "3"), ("column_b", "253"), 
("column_c", "10")]
+        cursor_mock.__iter__ = Mock(return_value=iter(INPUT_DATA))
+        mock_query.return_value = cursor_mock
+        mock_convert_type.return_value = "convert_type_return_value"
+
+        mock_file = Mock()
+
+        mock_tell = Mock()
+        mock_tell.return_value = 3
+        mock_file.tell = mock_tell
+
+        mock_flush = Mock()
+        mock_file.flush = mock_flush
+
+        mock_close = Mock()
+        mock_file.close = mock_close
+
+        mock_file.name = TMP_FILE_NAME
+
+        mock_write = Mock()
+        mock_file.write = mock_write
+
+        mock_tempfile.return_value = mock_file
+
+        operator = DummySQLToGCSOperator(sql=SQL,
+                                         bucket=BUCKET,
+                                         filename=FILENAME,
+                                         task_id=TASK_ID,
+                                         schema_filename=SCHEMA_FILE,
+                                         approx_max_file_size_bytes=1,
+                                         export_format="csv",
+                                         gzip=True,
+                                         schema=SCHEMA,
+                                         
google_cloud_storage_conn_id='google_cloud_default')
+        operator.execute(context=dict())
+
+        mock_query.assert_called_once()
+        mock_writerow.assert_has_calls([mock.call(COLUMNS), mock.call(ROW),
+                                        mock.call(COLUMNS), mock.call(ROW),
+                                        mock.call(COLUMNS), mock.call(ROW),
+                                        mock.call(COLUMNS)])
+        mock_flush.assert_has_calls([mock.call(), mock.call(), mock.call(), 
mock.call(), mock.call()])
+        csv_call = mock.call(BUCKET, FILENAME, TMP_FILE_NAME, 
mime_type='text/csv', gzip=True)
+        json_call = mock.call(BUCKET, SCHEMA_FILE, TMP_FILE_NAME, 
mime_type=APP_JSON, gzip=False)
+        upload_calls = [csv_call, csv_call, csv_call, json_call]
+        mock_upload.assert_has_calls(upload_calls)
+        mock_close.assert_has_calls([mock.call(), mock.call(), mock.call(), 
mock.call(), mock.call()])
+
+        mock_query.reset_mock()
+        mock_flush.reset_mock()
+        mock_upload.reset_mock()
+        mock_close.reset_mock()
+        cursor_mock.reset_mock()
+
+        cursor_mock.__iter__ = Mock(return_value=iter(INPUT_DATA))
+
+        operator = DummySQLToGCSOperator(sql=SQL,
+                                         bucket=BUCKET,
+                                         filename=FILENAME,
+                                         task_id=TASK_ID,
+                                         export_format="json",
+                                         schema=SCHEMA)
+        operator.execute(context=dict())
+
+        mock_query.assert_called_once()
+        mock_write.assert_has_calls([mock.call(OUTPUT_DATA),
+                                     mock.call(b"\n"),
+                                     mock.call(OUTPUT_DATA),
+                                     mock.call(b"\n"),
+                                     mock.call(OUTPUT_DATA),
+                                     mock.call(b"\n")])
+        mock_flush.assert_called_once()
+        mock_upload.assert_called_once_with(BUCKET, FILENAME, TMP_FILE_NAME, 
mime_type=APP_JSON, gzip=False)
+        mock_close.assert_called_once()
diff --git a/tests/test_project_structure.py b/tests/test_project_structure.py
index b1568c3..1795b83 100644
--- a/tests/test_project_structure.py
+++ b/tests/test_project_structure.py
@@ -30,7 +30,6 @@ ROOT_FOLDER = os.path.realpath(
 MISSING_TEST_FILES = {
     'tests/providers/google/cloud/log/test_gcs_task_handler.py',
     'tests/providers/google/cloud/operators/test_datastore.py',
-    'tests/providers/google/cloud/transfers/test_sql_to_gcs.py',
     'tests/providers/google/cloud/utils/test_field_sanitizer.py',
     'tests/providers/google/cloud/utils/test_field_validator.py',
     'tests/providers/google/cloud/utils/test_mlengine_prediction_summary.py',

Reply via email to