mik-laj commented on a change in pull request #4792: [AIRFLOW-3659] Create
Google Cloud Transfer Service Operators
URL: https://github.com/apache/airflow/pull/4792#discussion_r261448256
##########
File path: tests/contrib/hooks/test_gcp_transfer_hook.py
##########
@@ -18,125 +18,487 @@
# under the License.
#
import json
-import datetime
import unittest
+from copy import deepcopy
-from airflow.exceptions import AirflowException
-from airflow.contrib.hooks.gcp_transfer_hook import GCPTransferServiceHook
-from airflow.contrib.hooks.gcp_transfer_hook import TIME_TO_SLEEP_IN_SECONDS
+from airflow import AirflowException
+from airflow.contrib.hooks.gcp_transfer_hook import (
+ GCPTransferServiceHook,
+ TIME_TO_SLEEP_IN_SECONDS,
+ GcpTransferOperationStatus,
+)
+from tests.contrib.utils.base_gcp_mock import (
+ mock_base_gcp_hook_no_default_project_id,
+ mock_base_gcp_hook_default_project_id,
+)
try:
from unittest import mock
-except ImportError:
+except ImportError: # pragma: no cover
try:
import mock
except ImportError:
mock = None
+PROJECT_ID = 'project-id'
+BODY = {'description': 'AAA', 'project_id': PROJECT_ID}
-class TestGCPTransferServiceHook(unittest.TestCase):
+TRANSFER_JOB_NAME = "transfer-job"
+TRANSFER_OPERATION_NAME = "transfer-operation"
+
+TRANSFER_JOB = {"name": TRANSFER_JOB_NAME}
+TRANSFER_OPERATION = {"name": TRANSFER_OPERATION_NAME}
+
+TRANSFER_JOB_FILTER = {'project_id': 'project-id', 'job_names':
[TRANSFER_JOB_NAME]}
+TRANSFER_OPERATION_FILTER = {'project_id': 'project-id', 'job_names':
[TRANSFER_JOB_NAME]}
+UPDATE_TRANSFER_JOB_BODY = {
+ "transferJob": {'description': 'description-1'},
+ 'project_id': PROJECT_ID,
+ "update_transfer_job_field_mask": 'description',
+}
+
+
+class TestGCPTransferServiceHookWithPassedProjectId(unittest.TestCase):
def setUp(self):
- with mock.patch.object(GCPTransferServiceHook, '__init__',
return_value=None):
- self.conn = mock.Mock()
- self.transfer_hook = GCPTransferServiceHook()
- self.transfer_hook._conn = self.conn
-
- def test_create_transfer_job(self):
- mock_create = self.conn.transferJobs.return_value.create
- mock_execute = mock_create.return_value.execute
- mock_execute.return_value = {
- 'projectId': 'test-project',
- 'name': 'transferJobs/test-job',
- }
- now = datetime.datetime.utcnow()
- transfer_spec = {
- 'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
- 'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
- }
- self.transfer_hook.create_transfer_job(
- project_id='test-project',
- description='test-description',
- schedule=None,
- transfer_spec=transfer_spec)
- mock_create.assert_called_once_with(body={
- 'status': 'ENABLED',
- 'projectId': 'test-project',
- 'description': 'test-description',
- 'transferSpec': transfer_spec,
- 'schedule': {
- 'scheduleStartDate': {
- 'day': now.day,
- 'month': now.month,
- 'year': now.year,
- },
- 'scheduleEndDate': {
- 'day': now.day,
- 'month': now.month,
- 'year': now.year,
- }
- }
- })
-
- def test_create_transfer_job_custom_schedule(self):
- mock_create = self.conn.transferJobs.return_value.create
- mock_execute = mock_create.return_value.execute
- mock_execute.return_value = {
- 'projectId': 'test-project',
- 'name': 'transferJobs/test-job',
- }
- schedule = {
- 'scheduleStartDate': {'month': 10, 'day': 1, 'year': 2018},
- 'scheduleEndDate': {'month': 10, 'day': 31, 'year': 2018},
- }
- transfer_spec = {
- 'awsS3DataSource': {'bucketName': 'test-s3-bucket'},
- 'gcsDataSink': {'bucketName': 'test-gcs-bucket'}
- }
- self.transfer_hook.create_transfer_job(
- project_id='test-project',
- description='test-description',
- schedule=schedule,
- transfer_spec=transfer_spec)
- mock_create.assert_called_once_with(body={
- 'status': 'ENABLED',
- 'projectId': 'test-project',
- 'description': 'test-description',
- 'transferSpec': transfer_spec,
- 'schedule': schedule,
- })
+ with mock.patch(
+ 'airflow.contrib.hooks.'
'gcp_api_base_hook.GoogleCloudBaseHook.__init__',
+ new=mock_base_gcp_hook_no_default_project_id,
+ ):
+ self.gct_hook = GCPTransferServiceHook(gcp_conn_id='test')
+
+ @mock.patch('airflow.contrib.hooks.gcp_transfer_hook.'
'GCPTransferServiceHook.get_conn')
+ def test_create_transfer_job(self, get_conn):
+ create_method = get_conn.return_value.transferJobs.return_value.create
+ execute_method = create_method.return_value.execute
+ execute_method.return_value = TRANSFER_JOB
+ res = self.gct_hook.create_transfer_job(body=BODY)
+ self.assertEqual(res, TRANSFER_JOB)
+ create_method.assert_called_once_with(body=BODY)
+ execute_method.assert_called_once_with(num_retries=5)
+
+ @mock.patch('airflow.contrib.hooks.gcp_transfer_hook.'
'GCPTransferServiceHook.get_conn')
+ def test_get_transfer_job(self, get_conn):
+ get_method = get_conn.return_value.transferJobs.return_value.get
+ execute_method = get_method.return_value.execute
+ execute_method.return_value = TRANSFER_JOB
+ res = self.gct_hook.get_transfer_job(job_name=TRANSFER_JOB_NAME,
project_id=PROJECT_ID)
+ self.assertIsNotNone(res)
+ self.assertEqual(TRANSFER_JOB_NAME, res['name'])
+ get_method.assert_called_once_with(jobName=TRANSFER_JOB_NAME,
projectId=PROJECT_ID)
+ execute_method.assert_called_once_with(num_retries=5)
+
+ @mock.patch('airflow.contrib.hooks.gcp_transfer_hook.'
'GCPTransferServiceHook.get_conn')
+ def test_list_transfer_job(self, get_conn):
Review comment:
When creating this operator, we discovered that similar tests are missing
for other operators. We wanted to unify the code and complete it for the whole
project. We will add tests for this case now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services