Repository: incubator-airflow Updated Branches: refs/heads/master 74c1ce254 -> 219c50641
http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/operators/test_sqoop_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_sqoop_operator.py b/tests/contrib/operators/test_sqoop_operator.py new file mode 100644 index 0000000..a46dc93 --- /dev/null +++ b/tests/contrib/operators/test_sqoop_operator.py @@ -0,0 +1,93 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import datetime +import unittest + +from airflow import DAG, configuration +from airflow.contrib.operators.sqoop_operator import SqoopOperator + + +class TestSqoopOperator(unittest.TestCase): + _config = { + 'cmd_type': 'export', + 'table': 'target_table', + 'query': 'SELECT * FROM schema.table', + 'target_dir': '/path/on/hdfs/to/import', + 'append': True, + 'file_type': 'avro', + 'columns': 'a,b,c', + 'num_mappers': 22, + 'split_by': 'id', + 'export_dir': '/path/on/hdfs/to/export', + 'input_null_string': '\n', + 'input_null_non_string': '\t', + 'staging_table': 'target_table_staging', + 'clear_staging_table': True, + 'enclosed_by': '"', + 'escaped_by': '\\', + 'input_fields_terminated_by': '|', + 'input_lines_terminated_by': '\n', + 'input_optionally_enclosed_by': '"', + 'batch': True, + 'relaxed_isolation': True, + 'direct': True, + 'driver': 'com.microsoft.jdbc.sqlserver.SQLServerDriver', + 'properties': { + 'mapred.map.max.attempts': '1' + } + } + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': datetime.datetime(2017, 1, 1) + } + self.dag = DAG('test_dag_id', default_args=args) + + def test_execute(self, conn_id='sqoop_default'): + operator = SqoopOperator( + task_id='sqoop_job', + dag=self.dag, + **self._config + ) + + self.assertEqual(conn_id, operator.conn_id) + + self.assertEqual(self._config['cmd_type'], operator.cmd_type) + self.assertEqual(self._config['table'], operator.table) + self.assertEqual(self._config['target_dir'], operator.target_dir) + self.assertEqual(self._config['append'], operator.append) + self.assertEqual(self._config['file_type'], operator.file_type) + self.assertEqual(self._config['num_mappers'], operator.num_mappers) + self.assertEqual(self._config['split_by'], operator.split_by) + self.assertEqual(self._config['input_null_string'], + operator.input_null_string) + self.assertEqual(self._config['input_null_non_string'], + operator.input_null_non_string) + self.assertEqual(self._config['staging_table'], operator.staging_table) + self.assertEqual(self._config['clear_staging_table'], + operator.clear_staging_table) + self.assertEqual(self._config['batch'], operator.batch) + self.assertEqual(self._config['relaxed_isolation'], + operator.relaxed_isolation) + self.assertEqual(self._config['direct'], operator.direct) + self.assertEqual(self._config['driver'], operator.driver) + self.assertEqual(self._config['properties'], operator.properties) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/operators/test_ssh_execute_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_ssh_execute_operator.py b/tests/contrib/operators/test_ssh_execute_operator.py new file mode 100644 index 0000000..0c2b9f2 --- /dev/null +++ b/tests/contrib/operators/test_ssh_execute_operator.py @@ -0,0 +1,95 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import os +import sys +from datetime import datetime +from io import StringIO + +import mock + +from airflow import configuration +from airflow.settings import Session +from airflow import models, DAG +from airflow.contrib.operators.ssh_execute_operator import SSHExecuteOperator + + +TEST_DAG_ID = 'unit_tests' +DEFAULT_DATE = datetime(2015, 1, 1) +configuration.load_test_config() + + +def reset(dag_id=TEST_DAG_ID): + session = Session() + tis = session.query(models.TaskInstance).filter_by(dag_id=dag_id) + tis.delete() + session.commit() + session.close() + +reset() + + +class SSHExecuteOperatorTest(unittest.TestCase): + + def setUp(self): + + if sys.version_info[0] == 3: + raise unittest.SkipTest('SSHExecuteOperatorTest won\'t work with ' + 'python3. No need to test anything here') + + configuration.load_test_config() + from airflow.contrib.hooks.ssh_hook import SSHHook + hook = mock.MagicMock(spec=SSHHook) + hook.no_host_key_check = True + hook.Popen.return_value.stdout = StringIO(u'stdout') + hook.Popen.return_value.returncode = False + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE, + 'provide_context': True + } + dag = DAG(TEST_DAG_ID+'test_schedule_dag_once', default_args=args) + dag.schedule_interval = '@once' + self.hook = hook + self.dag = dag + + @mock.patch('airflow.contrib.operators.ssh_execute_operator.SSHTempFileContent') + def test_simple(self, temp_file): + temp_file.return_value.__enter__ = lambda x: 'filepath' + task = SSHExecuteOperator( + task_id="test", + bash_command="echo airflow", + ssh_hook=self.hook, + dag=self.dag, + ) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + @mock.patch('airflow.contrib.operators.ssh_execute_operator.SSHTempFileContent') + def test_with_env(self, temp_file): + temp_file.return_value.__enter__ = lambda x: 'filepath' + test_env = os.environ.copy() + test_env['AIRFLOW_test'] = "test" + task = SSHExecuteOperator( + task_id="test", + bash_command="echo $AIRFLOW_HOME", + ssh_hook=self.hook, + env=test_env['AIRFLOW_test'], + dag=self.dag, + ) + task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE, ignore_ti_state=True) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/datadog_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/datadog_sensor.py b/tests/contrib/sensors/datadog_sensor.py deleted file mode 100644 index 4d601e1..0000000 --- a/tests/contrib/sensors/datadog_sensor.py +++ /dev/null @@ -1,91 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from mock import patch - -from airflow.contrib.sensors.datadog_sensor import DatadogSensor - - -at_least_one_event = [{'alert_type': 'info', - 'comments': [], - 'date_happened': 1419436860, - 'device_name': None, - 'host': None, - 'id': 2603387619536318140, - 'is_aggregate': False, - 'priority': 'normal', - 'resource': '/api/v1/events/2603387619536318140', - 'source': 'My Apps', - 'tags': ['application:web', 'version:1'], - 'text': 'And let me tell you all about it here!', - 'title': 'Something big happened!', - 'url': '/event/jump_to?event_id=2603387619536318140'}, - {'alert_type': 'info', - 'comments': [], - 'date_happened': 1419436865, - 'device_name': None, - 'host': None, - 'id': 2603387619536318141, - 'is_aggregate': False, - 'priority': 'normal', - 'resource': '/api/v1/events/2603387619536318141', - 'source': 'My Apps', - 'tags': ['application:web', 'version:1'], - 'text': 'And let me tell you all about it here!', - 'title': 'Something big happened!', - 'url': '/event/jump_to?event_id=2603387619536318141'}] - -zero_events = [] - - -class TestDatadogSensor(unittest.TestCase): - @patch('airflow.contrib.hooks.datadog_hook.api.Event.query') - @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query') - def test_sensor_ok(self, api1, api2): - api1.return_value = at_least_one_event - api2.return_value = at_least_one_event - - sensor = DatadogSensor( - task_id='test_datadog', - datadog_conn_id='datadog_default', - from_seconds_ago=3600, - up_to_seconds_from_now=0, - priority=None, - sources=None, - tags=None, - response_check=None) - - self.assertTrue(sensor.poke({})) - - @patch('airflow.contrib.hooks.datadog_hook.api.Event.query') - @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query') - def test_sensor_fail(self, api1, api2): - api1.return_value = zero_events - api2.return_value = zero_events - - sensor = DatadogSensor( - task_id='test_datadog', - datadog_conn_id='datadog_default', - from_seconds_ago=0, - up_to_seconds_from_now=0, - priority=None, - sources=None, - tags=None, - response_check=None) - - self.assertFalse(sensor.poke({})) - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/emr_base_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/emr_base_sensor.py b/tests/contrib/sensors/emr_base_sensor.py deleted file mode 100644 index 0b8ad2f..0000000 --- a/tests/contrib/sensors/emr_base_sensor.py +++ /dev/null @@ -1,126 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest - -from airflow import configuration -from airflow.exceptions import AirflowException -from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor - - -class TestEmrBaseSensor(unittest.TestCase): - def setUp(self): - configuration.load_test_config() - - def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self): - class EmrBaseSensorSubclass(EmrBaseSensor): - NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' - - def get_emr_response(self): - return { - 'SomeKey': {'State': 'COMPLETED'}, - 'ResponseMetadata': {'HTTPStatusCode': 200} - } - - def state_from_response(self, response): - return response['SomeKey']['State'] - - operator = EmrBaseSensorSubclass( - task_id='test_task', - poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_test' - ) - - operator.execute(None) - - def test_poke_returns_false_when_state_is_a_non_terminal_state(self): - class EmrBaseSensorSubclass(EmrBaseSensor): - NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' - - def get_emr_response(self): - return { - 'SomeKey': {'State': 'PENDING'}, - 'ResponseMetadata': {'HTTPStatusCode': 200} - } - - def state_from_response(self, response): - return response['SomeKey']['State'] - - operator = EmrBaseSensorSubclass( - task_id='test_task', - poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_test' - ) - - self.assertEqual(operator.poke(None), False) - - def test_poke_returns_false_when_http_response_is_bad(self): - class EmrBaseSensorSubclass(EmrBaseSensor): - NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' - - def get_emr_response(self): - return { - 'SomeKey': {'State': 'COMPLETED'}, - 'ResponseMetadata': {'HTTPStatusCode': 400} - } - - def state_from_response(self, response): - return response['SomeKey']['State'] - - operator = EmrBaseSensorSubclass( - task_id='test_task', - poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_test' - ) - - self.assertEqual(operator.poke(None), False) - - - def test_poke_raises_error_when_job_has_failed(self): - class EmrBaseSensorSubclass(EmrBaseSensor): - NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] - FAILED_STATE = 'FAILED' - - def get_emr_response(self): - return { - 'SomeKey': {'State': 'FAILED'}, - 'ResponseMetadata': {'HTTPStatusCode': 200} - } - - def state_from_response(self, response): - return response['SomeKey']['State'] - - operator = EmrBaseSensorSubclass( - task_id='test_task', - poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_test' - ) - - with self.assertRaises(AirflowException) as context: - - operator.poke(None) - - - self.assertTrue('EMR job failed' in context.exception) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/emr_job_flow_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/emr_job_flow_sensor.py b/tests/contrib/sensors/emr_job_flow_sensor.py deleted file mode 100644 index f993786..0000000 --- a/tests/contrib/sensors/emr_job_flow_sensor.py +++ /dev/null @@ -1,123 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import datetime -from dateutil.tz import tzlocal -from mock import MagicMock, patch - -from airflow import configuration -from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor - -DESCRIBE_CLUSTER_RUNNING_RETURN = { - 'Cluster': { - 'Applications': [ - {'Name': 'Spark', 'Version': '1.6.1'} - ], - 'AutoTerminate': True, - 'Configurations': [], - 'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'}, - 'Id': 'j-27ZY9GBEEU2GU', - 'LogUri': 's3n://some-location/', - 'Name': 'PiCalc', - 'NormalizedInstanceHours': 0, - 'ReleaseLabel': 'emr-4.6.0', - 'ServiceRole': 'EMR_DefaultRole', - 'Status': { - 'State': 'STARTING', - 'StateChangeReason': {}, - 'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())} - }, - 'Tags': [ - {'Key': 'app', 'Value': 'analytics'}, - {'Key': 'environment', 'Value': 'development'} - ], - 'TerminationProtected': False, - 'VisibleToAllUsers': True - }, - 'ResponseMetadata': { - 'HTTPStatusCode': 200, - 'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e' - } -} - -DESCRIBE_CLUSTER_TERMINATED_RETURN = { - 'Cluster': { - 'Applications': [ - {'Name': 'Spark', 'Version': '1.6.1'} - ], - 'AutoTerminate': True, - 'Configurations': [], - 'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'}, - 'Id': 'j-27ZY9GBEEU2GU', - 'LogUri': 's3n://some-location/', - 'Name': 'PiCalc', - 'NormalizedInstanceHours': 0, - 'ReleaseLabel': 'emr-4.6.0', - 'ServiceRole': 'EMR_DefaultRole', - 'Status': { - 'State': 'TERMINATED', - 'StateChangeReason': {}, - 'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())} - }, - 'Tags': [ - {'Key': 'app', 'Value': 'analytics'}, - {'Key': 'environment', 'Value': 'development'} - ], - 'TerminationProtected': False, - 'VisibleToAllUsers': True - }, - 'ResponseMetadata': { - 'HTTPStatusCode': 200, - 'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e' - } -} - - -class TestEmrJobFlowSensor(unittest.TestCase): - def setUp(self): - configuration.load_test_config() - - # Mock out the emr_client (moto has incorrect response) - self.mock_emr_client = MagicMock() - self.mock_emr_client.describe_cluster.side_effect = [ - DESCRIBE_CLUSTER_RUNNING_RETURN, - DESCRIBE_CLUSTER_TERMINATED_RETURN - ] - - # Mock out the emr_client creator - self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client) - - - def test_execute_calls_with_the_job_flow_id_until_it_reaches_a_terminal_state(self): - with patch('boto3.client', self.boto3_client_mock): - - operator = EmrJobFlowSensor( - task_id='test_task', - poke_interval=2, - job_flow_id='j-8989898989', - aws_conn_id='aws_default' - ) - - operator.execute(None) - - # make sure we called twice - self.assertEqual(self.mock_emr_client.describe_cluster.call_count, 2) - - # make sure it was called with the job_flow_id - self.mock_emr_client.describe_cluster.assert_called_with(ClusterId='j-8989898989') - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/emr_step_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/emr_step_sensor.py b/tests/contrib/sensors/emr_step_sensor.py deleted file mode 100644 index 58ee461..0000000 --- a/tests/contrib/sensors/emr_step_sensor.py +++ /dev/null @@ -1,119 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -import datetime -from dateutil.tz import tzlocal -from mock import MagicMock, patch -import boto3 - -from airflow import configuration -from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor - -DESCRIBE_JOB_STEP_RUNNING_RETURN = { - 'ResponseMetadata': { - 'HTTPStatusCode': 200, - 'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6' - }, - 'Step': { - 'ActionOnFailure': 'CONTINUE', - 'Config': { - 'Args': [ - '/usr/lib/spark/bin/run-example', - 'SparkPi', - '10' - ], - 'Jar': 'command-runner.jar', - 'Properties': {} - }, - 'Id': 's-VK57YR1Z9Z5N', - 'Name': 'calculate_pi', - 'Status': { - 'State': 'RUNNING', - 'StateChangeReason': {}, - 'Timeline': { - 'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()), - 'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal()) - } - } - } -} - -DESCRIBE_JOB_STEP_COMPLETED_RETURN = { - 'ResponseMetadata': { - 'HTTPStatusCode': 200, - 'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6' - }, - 'Step': { - 'ActionOnFailure': 'CONTINUE', - 'Config': { - 'Args': [ - '/usr/lib/spark/bin/run-example', - 'SparkPi', - '10' - ], - 'Jar': 'command-runner.jar', - 'Properties': {} - }, - 'Id': 's-VK57YR1Z9Z5N', - 'Name': 'calculate_pi', - 'Status': { - 'State': 'COMPLETED', - 'StateChangeReason': {}, - 'Timeline': { - 'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()), - 'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal()) - } - } - } -} - - -class TestEmrStepSensor(unittest.TestCase): - def setUp(self): - configuration.load_test_config() - - # Mock out the emr_client (moto has incorrect response) - self.mock_emr_client = MagicMock() - self.mock_emr_client.describe_step.side_effect = [ - DESCRIBE_JOB_STEP_RUNNING_RETURN, - DESCRIBE_JOB_STEP_COMPLETED_RETURN - ] - - # Mock out the emr_client creator - self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client) - - - def test_execute_calls_with_the_job_flow_id_and_step_id_until_it_reaches_a_terminal_state(self): - with patch('boto3.client', self.boto3_client_mock): - - operator = EmrStepSensor( - task_id='test_task', - poke_interval=1, - job_flow_id='j-8989898989', - step_id='s-VK57YR1Z9Z5N', - aws_conn_id='aws_default', - ) - - operator.execute(None) - - # make sure we called twice - self.assertEqual(self.mock_emr_client.describe_step.call_count, 2) - - # make sure it was called with the job_flow_id and step_id - self.mock_emr_client.describe_step.assert_called_with(ClusterId='j-8989898989', StepId='s-VK57YR1Z9Z5N') - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/ftp_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/ftp_sensor.py b/tests/contrib/sensors/ftp_sensor.py deleted file mode 100644 index 50f8b8b..0000000 --- a/tests/contrib/sensors/ftp_sensor.py +++ /dev/null @@ -1,66 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import unittest -from ftplib import error_perm - -from mock import MagicMock - -from airflow.contrib.hooks.ftp_hook import FTPHook -from airflow.contrib.sensors.ftp_sensor import FTPSensor - - -class TestFTPSensor(unittest.TestCase): - def setUp(self): - super(TestFTPSensor, self).setUp() - self._create_hook_orig = FTPSensor._create_hook - self.hook_mock = MagicMock(spec=FTPHook) - - def _create_hook_mock(sensor): - mock = MagicMock() - mock.__enter__ = lambda x: self.hook_mock - - return mock - - FTPSensor._create_hook = _create_hook_mock - - def tearDown(self): - FTPSensor._create_hook = self._create_hook_orig - super(TestFTPSensor, self).tearDown() - - def test_poke(self): - op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp", - task_id="test_task") - - self.hook_mock.get_mod_time.side_effect = \ - [error_perm("550: Can't check for file existence"), None] - - self.assertFalse(op.poke(None)) - self.assertTrue(op.poke(None)) - - def test_poke_fails_due_error(self): - op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp", - task_id="test_task") - - self.hook_mock.get_mod_time.side_effect = \ - error_perm("530: Login authentication failed") - - with self.assertRaises(error_perm) as context: - op.execute(None) - - self.assertTrue("530" in str(context.exception)) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/hdfs_sensors.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/hdfs_sensors.py b/tests/contrib/sensors/hdfs_sensors.py deleted file mode 100644 index 0e2ed0c..0000000 --- a/tests/contrib/sensors/hdfs_sensors.py +++ /dev/null @@ -1,251 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -import logging -import sys -import unittest -import re -from datetime import timedelta -from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex -from airflow.exceptions import AirflowSensorTimeout - - -class HdfsSensorFolderTests(unittest.TestCase): - def setUp(self): - if sys.version_info[0] == 3: - raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here') - from tests.core import FakeHDFSHook - self.hook = FakeHDFSHook - self.logger = logging.getLogger() - self.logger.setLevel(logging.DEBUG) - - def test_should_be_empty_directory(self): - """ - test the empty directory behaviour - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - task = HdfsSensorFolder(task_id='Should_be_empty_directory', - filepath='/datadirectory/empty_directory', - be_empty=True, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_should_be_empty_directory_fail(self): - """ - test the empty directory behaviour - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail', - filepath='/datadirectory/not_empty_directory', - be_empty=True, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - # Then - with self.assertRaises(AirflowSensorTimeout): - task.execute(None) - - def test_should_be_a_non_empty_directory(self): - """ - test the empty directory behaviour - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - task = HdfsSensorFolder(task_id='Should_be_non_empty_directory', - filepath='/datadirectory/not_empty_directory', - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_should_be_non_empty_directory_fail(self): - """ - test the empty directory behaviour - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail', - filepath='/datadirectory/empty_directory', - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - # Then - with self.assertRaises(AirflowSensorTimeout): - task.execute(None) - - -class HdfsSensorRegexTests(unittest.TestCase): - def setUp(self): - if sys.version_info[0] == 3: - raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here') - from tests.core import FakeHDFSHook - self.hook = FakeHDFSHook - self.logger = logging.getLogger() - self.logger.setLevel(logging.DEBUG) - - def test_should_match_regex(self): - """ - test the empty directory behaviour - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - compiled_regex = re.compile("test[1-2]file") - task = HdfsSensorRegex(task_id='Should_match_the_regex', - filepath='/datadirectory/regex_dir', - regex=compiled_regex, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_should_not_match_regex(self): - """ - test the empty directory behaviour - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - compiled_regex = re.compile("^IDoNotExist") - task = HdfsSensorRegex(task_id='Should_not_match_the_regex', - filepath='/datadirectory/regex_dir', - regex=compiled_regex, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - # Then - with self.assertRaises(AirflowSensorTimeout): - task.execute(None) - - def test_should_match_regex_and_filesize(self): - """ - test the file size behaviour with regex - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - compiled_regex = re.compile("test[1-2]file") - task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize', - filepath='/datadirectory/regex_dir', - regex=compiled_regex, - ignore_copying=True, - ignored_ext=['_COPYING_', 'sftp'], - file_size=10, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - task.execute(None) - - # Then - # Nothing happens, nothing is raised exec is ok - - def test_should_match_regex_but_filesize(self): - """ - test the file size behaviour with regex - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - compiled_regex = re.compile("test[1-2]file") - task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize', - filepath='/datadirectory/regex_dir', - regex=compiled_regex, - file_size=20, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - # Then - with self.assertRaises(AirflowSensorTimeout): - task.execute(None) - - def test_should_match_regex_but_copyingext(self): - """ - test the file size behaviour with regex - :return: - """ - # Given - self.logger.debug('#' * 10) - self.logger.debug('Running %s', self._testMethodName) - self.logger.debug('#' * 10) - compiled_regex = re.compile("copying_file_\d+.txt") - task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize', - filepath='/datadirectory/regex_dir', - regex=compiled_regex, - ignored_ext=['_COPYING_', 'sftp'], - file_size=20, - timeout=1, - retry_delay=timedelta(seconds=1), - poke_interval=1, - hook=self.hook) - - # When - # Then - with self.assertRaises(AirflowSensorTimeout): - task.execute(None) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/jira_sensor_test.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/jira_sensor_test.py b/tests/contrib/sensors/jira_sensor_test.py deleted file mode 100644 index 77ca97f..0000000 --- a/tests/contrib/sensors/jira_sensor_test.py +++ /dev/null @@ -1,85 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -import unittest -import datetime -from mock import Mock -from mock import patch - -from airflow import DAG, configuration -from airflow.contrib.sensors.jira_sensor import JiraTicketSensor -from airflow import models -from airflow.utils import db - -DEFAULT_DATE = datetime.datetime(2017, 1, 1) -jira_client_mock = Mock( - name="jira_client_for_test" -) - -minimal_test_ticket = { - "id": "911539", - "self": "https://sandbox.localhost/jira/rest/api/2/issue/911539", - "key": "TEST-1226", - "fields": { - "labels": [ - "test-label-1", - "test-label-2" - ], - "description": "this is a test description", - } -} - - -class TestJiraSensor(unittest.TestCase): - def setUp(self): - configuration.load_test_config() - args = { - 'owner': 'airflow', - 'start_date': DEFAULT_DATE - } - dag = DAG('test_dag_id', default_args=args) - self.dag = dag - db.merge_conn( - models.Connection( - conn_id='jira_default', conn_type='jira', - host='https://localhost/jira/', port=443, - extra='{"verify": "False", "project": "AIRFLOW"}')) - - @patch("airflow.contrib.hooks.jira_hook.JIRA", - autospec=True, return_value=jira_client_mock) - def test_issue_label_set(self, jira_mock): - jira_mock.return_value.issue.return_value = minimal_test_ticket - - ticket_label_sensor = JiraTicketSensor(task_id='search-ticket-test', - ticket_id='TEST-1226', - field_checker_func= - TestJiraSensor.field_checker_func, - timeout=518400, - poke_interval=10, - dag=self.dag) - - ticket_label_sensor.run(start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, ignore_ti_state=True) - - self.assertTrue(jira_mock.called) - self.assertTrue(jira_mock.return_value.issue.called) - - @staticmethod - def field_checker_func(context, issue): - return "test-label-1" in issue['fields']['labels'] - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/redis_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/redis_sensor.py b/tests/contrib/sensors/redis_sensor.py deleted file mode 100644 index 8022a92..0000000 --- a/tests/contrib/sensors/redis_sensor.py +++ /dev/null @@ -1,64 +0,0 @@ -# -*- coding: utf-8 -*- -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import unittest -import datetime - -from mock import patch - -from airflow import DAG -from airflow import configuration -from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor - -DEFAULT_DATE = datetime.datetime(2017, 1, 1) - - -class TestRedisSensor(unittest.TestCase): - - def setUp(self): - configuration.load_test_config() - args = { - 'owner': 'airflow', - 'start_date': DEFAULT_DATE - } - - self.dag = DAG('test_dag_id', default_args=args) - self.sensor = RedisKeySensor( - task_id='test_task', - redis_conn_id='redis_default', - dag=self.dag, - key='test_key' - ) - - @patch("airflow.contrib.hooks.redis_hook.RedisHook.key_exists") - def test_poke(self, key_exists): - key_exists.return_value = True - self.assertTrue(self.sensor.poke(None)) - - key_exists.return_value = False - self.assertFalse(self.sensor.poke(None)) - - @patch("airflow.contrib.hooks.redis_hook.StrictRedis.exists") - def test_existing_key_called(self, redis_client_exists): - self.sensor.run( - start_date=DEFAULT_DATE, - end_date=DEFAULT_DATE, ignore_ti_state=True - ) - - self.assertTrue(redis_client_exists.called_with('test_key')) - - -if __name__ == '__main__': - unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_datadog_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_datadog_sensor.py b/tests/contrib/sensors/test_datadog_sensor.py new file mode 100644 index 0000000..d845c54 --- /dev/null +++ b/tests/contrib/sensors/test_datadog_sensor.py @@ -0,0 +1,106 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +import unittest +from mock import patch + +from airflow import configuration +from airflow.utils import db +from airflow import models +from airflow.contrib.sensors.datadog_sensor import DatadogSensor + + +at_least_one_event = [{'alert_type': 'info', + 'comments': [], + 'date_happened': 1419436860, + 'device_name': None, + 'host': None, + 'id': 2603387619536318140, + 'is_aggregate': False, + 'priority': 'normal', + 'resource': '/api/v1/events/2603387619536318140', + 'source': 'My Apps', + 'tags': ['application:web', 'version:1'], + 'text': 'And let me tell you all about it here!', + 'title': 'Something big happened!', + 'url': '/event/jump_to?event_id=2603387619536318140'}, + {'alert_type': 'info', + 'comments': [], + 'date_happened': 1419436865, + 'device_name': None, + 'host': None, + 'id': 2603387619536318141, + 'is_aggregate': False, + 'priority': 'normal', + 'resource': '/api/v1/events/2603387619536318141', + 'source': 'My Apps', + 'tags': ['application:web', 'version:1'], + 'text': 'And let me tell you all about it here!', + 'title': 'Something big happened!', + 'url': '/event/jump_to?event_id=2603387619536318141'}] + +zero_events = [] + + +class TestDatadogSensor(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + db.merge_conn( + models.Connection( + conn_id='datadog_default', conn_type='datadog', + login='login', password='password', + extra=json.dumps({'api_key': 'api_key', 'app_key': 'app_key'}) + ) + ) + + @patch('airflow.contrib.hooks.datadog_hook.api.Event.query') + @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query') + def test_sensor_ok(self, api1, api2): + api1.return_value = at_least_one_event + api2.return_value = at_least_one_event + + sensor = DatadogSensor( + task_id='test_datadog', + datadog_conn_id='datadog_default', + from_seconds_ago=3600, + up_to_seconds_from_now=0, + priority=None, + sources=None, + tags=None, + response_check=None) + + self.assertTrue(sensor.poke({})) + + @patch('airflow.contrib.hooks.datadog_hook.api.Event.query') + @patch('airflow.contrib.sensors.datadog_sensor.api.Event.query') + def test_sensor_fail(self, api1, api2): + api1.return_value = zero_events + api2.return_value = zero_events + + sensor = DatadogSensor( + task_id='test_datadog', + datadog_conn_id='datadog_default', + from_seconds_ago=0, + up_to_seconds_from_now=0, + priority=None, + sources=None, + tags=None, + response_check=None) + + self.assertFalse(sensor.poke({})) + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_emr_base_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_emr_base_sensor.py b/tests/contrib/sensors/test_emr_base_sensor.py new file mode 100644 index 0000000..9c39abb --- /dev/null +++ b/tests/contrib/sensors/test_emr_base_sensor.py @@ -0,0 +1,126 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest + +from airflow import configuration +from airflow.exceptions import AirflowException +from airflow.contrib.sensors.emr_base_sensor import EmrBaseSensor + + +class TestEmrBaseSensor(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + + def test_subclasses_that_implment_required_methods_and_constants_succeed_when_response_is_good(self): + class EmrBaseSensorSubclass(EmrBaseSensor): + NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] + FAILED_STATE = 'FAILED' + + def get_emr_response(self): + return { + 'SomeKey': {'State': 'COMPLETED'}, + 'ResponseMetadata': {'HTTPStatusCode': 200} + } + + def state_from_response(self, response): + return response['SomeKey']['State'] + + operator = EmrBaseSensorSubclass( + task_id='test_task', + poke_interval=2, + job_flow_id='j-8989898989', + aws_conn_id='aws_test' + ) + + operator.execute(None) + + def test_poke_returns_false_when_state_is_a_non_terminal_state(self): + class EmrBaseSensorSubclass(EmrBaseSensor): + NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] + FAILED_STATE = 'FAILED' + + def get_emr_response(self): + return { + 'SomeKey': {'State': 'PENDING'}, + 'ResponseMetadata': {'HTTPStatusCode': 200} + } + + def state_from_response(self, response): + return response['SomeKey']['State'] + + operator = EmrBaseSensorSubclass( + task_id='test_task', + poke_interval=2, + job_flow_id='j-8989898989', + aws_conn_id='aws_test' + ) + + self.assertEqual(operator.poke(None), False) + + def test_poke_returns_false_when_http_response_is_bad(self): + class EmrBaseSensorSubclass(EmrBaseSensor): + NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] + FAILED_STATE = 'FAILED' + + def get_emr_response(self): + return { + 'SomeKey': {'State': 'COMPLETED'}, + 'ResponseMetadata': {'HTTPStatusCode': 400} + } + + def state_from_response(self, response): + return response['SomeKey']['State'] + + operator = EmrBaseSensorSubclass( + task_id='test_task', + poke_interval=2, + job_flow_id='j-8989898989', + aws_conn_id='aws_test' + ) + + self.assertEqual(operator.poke(None), False) + + + def test_poke_raises_error_when_job_has_failed(self): + class EmrBaseSensorSubclass(EmrBaseSensor): + NON_TERMINAL_STATES = ['PENDING', 'RUNNING', 'CONTINUE'] + FAILED_STATE = 'FAILED' + + def get_emr_response(self): + return { + 'SomeKey': {'State': 'FAILED'}, + 'ResponseMetadata': {'HTTPStatusCode': 200} + } + + def state_from_response(self, response): + return response['SomeKey']['State'] + + operator = EmrBaseSensorSubclass( + task_id='test_task', + poke_interval=2, + job_flow_id='j-8989898989', + aws_conn_id='aws_test' + ) + + with self.assertRaises(AirflowException) as context: + + operator.poke(None) + + + self.assertIn('EMR job failed', str(context.exception)) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_emr_job_flow_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_emr_job_flow_sensor.py b/tests/contrib/sensors/test_emr_job_flow_sensor.py new file mode 100644 index 0000000..f993786 --- /dev/null +++ b/tests/contrib/sensors/test_emr_job_flow_sensor.py @@ -0,0 +1,123 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import datetime +from dateutil.tz import tzlocal +from mock import MagicMock, patch + +from airflow import configuration +from airflow.contrib.sensors.emr_job_flow_sensor import EmrJobFlowSensor + +DESCRIBE_CLUSTER_RUNNING_RETURN = { + 'Cluster': { + 'Applications': [ + {'Name': 'Spark', 'Version': '1.6.1'} + ], + 'AutoTerminate': True, + 'Configurations': [], + 'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'}, + 'Id': 'j-27ZY9GBEEU2GU', + 'LogUri': 's3n://some-location/', + 'Name': 'PiCalc', + 'NormalizedInstanceHours': 0, + 'ReleaseLabel': 'emr-4.6.0', + 'ServiceRole': 'EMR_DefaultRole', + 'Status': { + 'State': 'STARTING', + 'StateChangeReason': {}, + 'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())} + }, + 'Tags': [ + {'Key': 'app', 'Value': 'analytics'}, + {'Key': 'environment', 'Value': 'development'} + ], + 'TerminationProtected': False, + 'VisibleToAllUsers': True + }, + 'ResponseMetadata': { + 'HTTPStatusCode': 200, + 'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e' + } +} + +DESCRIBE_CLUSTER_TERMINATED_RETURN = { + 'Cluster': { + 'Applications': [ + {'Name': 'Spark', 'Version': '1.6.1'} + ], + 'AutoTerminate': True, + 'Configurations': [], + 'Ec2InstanceAttributes': {'IamInstanceProfile': 'EMR_EC2_DefaultRole'}, + 'Id': 'j-27ZY9GBEEU2GU', + 'LogUri': 's3n://some-location/', + 'Name': 'PiCalc', + 'NormalizedInstanceHours': 0, + 'ReleaseLabel': 'emr-4.6.0', + 'ServiceRole': 'EMR_DefaultRole', + 'Status': { + 'State': 'TERMINATED', + 'StateChangeReason': {}, + 'Timeline': {'CreationDateTime': datetime.datetime(2016, 6, 27, 21, 5, 2, 348000, tzinfo=tzlocal())} + }, + 'Tags': [ + {'Key': 'app', 'Value': 'analytics'}, + {'Key': 'environment', 'Value': 'development'} + ], + 'TerminationProtected': False, + 'VisibleToAllUsers': True + }, + 'ResponseMetadata': { + 'HTTPStatusCode': 200, + 'RequestId': 'd5456308-3caa-11e6-9d46-951401f04e0e' + } +} + + +class TestEmrJobFlowSensor(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + + # Mock out the emr_client (moto has incorrect response) + self.mock_emr_client = MagicMock() + self.mock_emr_client.describe_cluster.side_effect = [ + DESCRIBE_CLUSTER_RUNNING_RETURN, + DESCRIBE_CLUSTER_TERMINATED_RETURN + ] + + # Mock out the emr_client creator + self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client) + + + def test_execute_calls_with_the_job_flow_id_until_it_reaches_a_terminal_state(self): + with patch('boto3.client', self.boto3_client_mock): + + operator = EmrJobFlowSensor( + task_id='test_task', + poke_interval=2, + job_flow_id='j-8989898989', + aws_conn_id='aws_default' + ) + + operator.execute(None) + + # make sure we called twice + self.assertEqual(self.mock_emr_client.describe_cluster.call_count, 2) + + # make sure it was called with the job_flow_id + self.mock_emr_client.describe_cluster.assert_called_with(ClusterId='j-8989898989') + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_emr_step_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_emr_step_sensor.py b/tests/contrib/sensors/test_emr_step_sensor.py new file mode 100644 index 0000000..58ee461 --- /dev/null +++ b/tests/contrib/sensors/test_emr_step_sensor.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +import datetime +from dateutil.tz import tzlocal +from mock import MagicMock, patch +import boto3 + +from airflow import configuration +from airflow.contrib.sensors.emr_step_sensor import EmrStepSensor + +DESCRIBE_JOB_STEP_RUNNING_RETURN = { + 'ResponseMetadata': { + 'HTTPStatusCode': 200, + 'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6' + }, + 'Step': { + 'ActionOnFailure': 'CONTINUE', + 'Config': { + 'Args': [ + '/usr/lib/spark/bin/run-example', + 'SparkPi', + '10' + ], + 'Jar': 'command-runner.jar', + 'Properties': {} + }, + 'Id': 's-VK57YR1Z9Z5N', + 'Name': 'calculate_pi', + 'Status': { + 'State': 'RUNNING', + 'StateChangeReason': {}, + 'Timeline': { + 'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()), + 'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal()) + } + } + } +} + +DESCRIBE_JOB_STEP_COMPLETED_RETURN = { + 'ResponseMetadata': { + 'HTTPStatusCode': 200, + 'RequestId': '8dee8db2-3719-11e6-9e20-35b2f861a2a6' + }, + 'Step': { + 'ActionOnFailure': 'CONTINUE', + 'Config': { + 'Args': [ + '/usr/lib/spark/bin/run-example', + 'SparkPi', + '10' + ], + 'Jar': 'command-runner.jar', + 'Properties': {} + }, + 'Id': 's-VK57YR1Z9Z5N', + 'Name': 'calculate_pi', + 'Status': { + 'State': 'COMPLETED', + 'StateChangeReason': {}, + 'Timeline': { + 'CreationDateTime': datetime.datetime(2016, 6, 20, 19, 0, 18, 787000, tzinfo=tzlocal()), + 'StartDateTime': datetime.datetime(2016, 6, 20, 19, 2, 34, 889000, tzinfo=tzlocal()) + } + } + } +} + + +class TestEmrStepSensor(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + + # Mock out the emr_client (moto has incorrect response) + self.mock_emr_client = MagicMock() + self.mock_emr_client.describe_step.side_effect = [ + DESCRIBE_JOB_STEP_RUNNING_RETURN, + DESCRIBE_JOB_STEP_COMPLETED_RETURN + ] + + # Mock out the emr_client creator + self.boto3_client_mock = MagicMock(return_value=self.mock_emr_client) + + + def test_execute_calls_with_the_job_flow_id_and_step_id_until_it_reaches_a_terminal_state(self): + with patch('boto3.client', self.boto3_client_mock): + + operator = EmrStepSensor( + task_id='test_task', + poke_interval=1, + job_flow_id='j-8989898989', + step_id='s-VK57YR1Z9Z5N', + aws_conn_id='aws_default', + ) + + operator.execute(None) + + # make sure we called twice + self.assertEqual(self.mock_emr_client.describe_step.call_count, 2) + + # make sure it was called with the job_flow_id and step_id + self.mock_emr_client.describe_step.assert_called_with(ClusterId='j-8989898989', StepId='s-VK57YR1Z9Z5N') + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_ftp_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_ftp_sensor.py b/tests/contrib/sensors/test_ftp_sensor.py new file mode 100644 index 0000000..50f8b8b --- /dev/null +++ b/tests/contrib/sensors/test_ftp_sensor.py @@ -0,0 +1,66 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import unittest +from ftplib import error_perm + +from mock import MagicMock + +from airflow.contrib.hooks.ftp_hook import FTPHook +from airflow.contrib.sensors.ftp_sensor import FTPSensor + + +class TestFTPSensor(unittest.TestCase): + def setUp(self): + super(TestFTPSensor, self).setUp() + self._create_hook_orig = FTPSensor._create_hook + self.hook_mock = MagicMock(spec=FTPHook) + + def _create_hook_mock(sensor): + mock = MagicMock() + mock.__enter__ = lambda x: self.hook_mock + + return mock + + FTPSensor._create_hook = _create_hook_mock + + def tearDown(self): + FTPSensor._create_hook = self._create_hook_orig + super(TestFTPSensor, self).tearDown() + + def test_poke(self): + op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp", + task_id="test_task") + + self.hook_mock.get_mod_time.side_effect = \ + [error_perm("550: Can't check for file existence"), None] + + self.assertFalse(op.poke(None)) + self.assertTrue(op.poke(None)) + + def test_poke_fails_due_error(self): + op = FTPSensor(path="foobar.json", ftp_conn_id="bob_ftp", + task_id="test_task") + + self.hook_mock.get_mod_time.side_effect = \ + error_perm("530: Login authentication failed") + + with self.assertRaises(error_perm) as context: + op.execute(None) + + self.assertTrue("530" in str(context.exception)) + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_hdfs_sensors.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_hdfs_sensors.py b/tests/contrib/sensors/test_hdfs_sensors.py new file mode 100644 index 0000000..0e2ed0c --- /dev/null +++ b/tests/contrib/sensors/test_hdfs_sensors.py @@ -0,0 +1,251 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging +import sys +import unittest +import re +from datetime import timedelta +from airflow.contrib.sensors.hdfs_sensors import HdfsSensorFolder, HdfsSensorRegex +from airflow.exceptions import AirflowSensorTimeout + + +class HdfsSensorFolderTests(unittest.TestCase): + def setUp(self): + if sys.version_info[0] == 3: + raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here') + from tests.core import FakeHDFSHook + self.hook = FakeHDFSHook + self.logger = logging.getLogger() + self.logger.setLevel(logging.DEBUG) + + def test_should_be_empty_directory(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + task = HdfsSensorFolder(task_id='Should_be_empty_directory', + filepath='/datadirectory/empty_directory', + be_empty=True, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_should_be_empty_directory_fail(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail', + filepath='/datadirectory/not_empty_directory', + be_empty=True, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + def test_should_be_a_non_empty_directory(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + task = HdfsSensorFolder(task_id='Should_be_non_empty_directory', + filepath='/datadirectory/not_empty_directory', + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_should_be_non_empty_directory_fail(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + task = HdfsSensorFolder(task_id='Should_be_empty_directory_fail', + filepath='/datadirectory/empty_directory', + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + +class HdfsSensorRegexTests(unittest.TestCase): + def setUp(self): + if sys.version_info[0] == 3: + raise unittest.SkipTest('HdfsSensor won\'t work with python3. No need to test anything here') + from tests.core import FakeHDFSHook + self.hook = FakeHDFSHook + self.logger = logging.getLogger() + self.logger.setLevel(logging.DEBUG) + + def test_should_match_regex(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("test[1-2]file") + task = HdfsSensorRegex(task_id='Should_match_the_regex', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_should_not_match_regex(self): + """ + test the empty directory behaviour + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("^IDoNotExist") + task = HdfsSensorRegex(task_id='Should_not_match_the_regex', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + def test_should_match_regex_and_filesize(self): + """ + test the file size behaviour with regex + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("test[1-2]file") + task = HdfsSensorRegex(task_id='Should_match_the_regex_and_filesize', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + ignore_copying=True, + ignored_ext=['_COPYING_', 'sftp'], + file_size=10, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + task.execute(None) + + # Then + # Nothing happens, nothing is raised exec is ok + + def test_should_match_regex_but_filesize(self): + """ + test the file size behaviour with regex + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("test[1-2]file") + task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + file_size=20, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) + + def test_should_match_regex_but_copyingext(self): + """ + test the file size behaviour with regex + :return: + """ + # Given + self.logger.debug('#' * 10) + self.logger.debug('Running %s', self._testMethodName) + self.logger.debug('#' * 10) + compiled_regex = re.compile("copying_file_\d+.txt") + task = HdfsSensorRegex(task_id='Should_match_the_regex_but_filesize', + filepath='/datadirectory/regex_dir', + regex=compiled_regex, + ignored_ext=['_COPYING_', 'sftp'], + file_size=20, + timeout=1, + retry_delay=timedelta(seconds=1), + poke_interval=1, + hook=self.hook) + + # When + # Then + with self.assertRaises(AirflowSensorTimeout): + task.execute(None) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_jira_sensor_test.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_jira_sensor_test.py b/tests/contrib/sensors/test_jira_sensor_test.py new file mode 100644 index 0000000..77ca97f --- /dev/null +++ b/tests/contrib/sensors/test_jira_sensor_test.py @@ -0,0 +1,85 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import unittest +import datetime +from mock import Mock +from mock import patch + +from airflow import DAG, configuration +from airflow.contrib.sensors.jira_sensor import JiraTicketSensor +from airflow import models +from airflow.utils import db + +DEFAULT_DATE = datetime.datetime(2017, 1, 1) +jira_client_mock = Mock( + name="jira_client_for_test" +) + +minimal_test_ticket = { + "id": "911539", + "self": "https://sandbox.localhost/jira/rest/api/2/issue/911539", + "key": "TEST-1226", + "fields": { + "labels": [ + "test-label-1", + "test-label-2" + ], + "description": "this is a test description", + } +} + + +class TestJiraSensor(unittest.TestCase): + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + dag = DAG('test_dag_id', default_args=args) + self.dag = dag + db.merge_conn( + models.Connection( + conn_id='jira_default', conn_type='jira', + host='https://localhost/jira/', port=443, + extra='{"verify": "False", "project": "AIRFLOW"}')) + + @patch("airflow.contrib.hooks.jira_hook.JIRA", + autospec=True, return_value=jira_client_mock) + def test_issue_label_set(self, jira_mock): + jira_mock.return_value.issue.return_value = minimal_test_ticket + + ticket_label_sensor = JiraTicketSensor(task_id='search-ticket-test', + ticket_id='TEST-1226', + field_checker_func= + TestJiraSensor.field_checker_func, + timeout=518400, + poke_interval=10, + dag=self.dag) + + ticket_label_sensor.run(start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, ignore_ti_state=True) + + self.assertTrue(jira_mock.called) + self.assertTrue(jira_mock.return_value.issue.called) + + @staticmethod + def field_checker_func(context, issue): + return "test-label-1" in issue['fields']['labels'] + + +if __name__ == '__main__': + unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/219c5064/tests/contrib/sensors/test_redis_sensor.py ---------------------------------------------------------------------- diff --git a/tests/contrib/sensors/test_redis_sensor.py b/tests/contrib/sensors/test_redis_sensor.py new file mode 100644 index 0000000..8022a92 --- /dev/null +++ b/tests/contrib/sensors/test_redis_sensor.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import unittest +import datetime + +from mock import patch + +from airflow import DAG +from airflow import configuration +from airflow.contrib.sensors.redis_key_sensor import RedisKeySensor + +DEFAULT_DATE = datetime.datetime(2017, 1, 1) + + +class TestRedisSensor(unittest.TestCase): + + def setUp(self): + configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } + + self.dag = DAG('test_dag_id', default_args=args) + self.sensor = RedisKeySensor( + task_id='test_task', + redis_conn_id='redis_default', + dag=self.dag, + key='test_key' + ) + + @patch("airflow.contrib.hooks.redis_hook.RedisHook.key_exists") + def test_poke(self, key_exists): + key_exists.return_value = True + self.assertTrue(self.sensor.poke(None)) + + key_exists.return_value = False + self.assertFalse(self.sensor.poke(None)) + + @patch("airflow.contrib.hooks.redis_hook.StrictRedis.exists") + def test_existing_key_called(self, redis_client_exists): + self.sensor.run( + start_date=DEFAULT_DATE, + end_date=DEFAULT_DATE, ignore_ti_state=True + ) + + self.assertTrue(redis_client_exists.called_with('test_key')) + + +if __name__ == '__main__': + unittest.main()
