Repository: incubator-airflow Updated Branches: refs/heads/master fd6772116 -> 44551e249
[AIRFLOW-713] Jinjafy {EmrCreateJobFlow,EmrAddSteps}Operator attributes To dynamically templat the fields of the Emr Operators, we need to pass the fields to jinja Closes #3016 from Swalloow/emr-jinjafied Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/44551e24 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/44551e24 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/44551e24 Branch: refs/heads/master Commit: 44551e249fd338f3c4d24ef95d4b9c021f3b0688 Parents: fd67721 Author: Swalloow <swalloow...@gmail.com> Authored: Fri Feb 9 10:20:02 2018 +0100 Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com> Committed: Fri Feb 9 10:20:06 2018 +0100 ---------------------------------------------------------------------- .../operators/emr_create_job_flow_operator.py | 2 +- .../operators/test_emr_add_steps_operator.py | 77 ++++++++++++++---- .../test_emr_create_job_flow_operator.py | 86 ++++++++++++++++---- 3 files changed, 134 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44551e24/airflow/contrib/operators/emr_create_job_flow_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py b/airflow/contrib/operators/emr_create_job_flow_operator.py index 2544adf..8111800 100644 --- a/airflow/contrib/operators/emr_create_job_flow_operator.py +++ b/airflow/contrib/operators/emr_create_job_flow_operator.py @@ -29,7 +29,7 @@ class EmrCreateJobFlowOperator(BaseOperator): :param job_flow_overrides: boto3 style arguments to override emr_connection extra :type steps: dict """ - template_fields = [] + template_fields = ['job_flow_overrides'] template_ext = () ui_color = '#f9c915' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44551e24/tests/contrib/operators/test_emr_add_steps_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_emr_add_steps_operator.py b/tests/contrib/operators/test_emr_add_steps_operator.py index 141e986..e5ac9fe 100644 --- a/tests/contrib/operators/test_emr_add_steps_operator.py +++ b/tests/contrib/operators/test_emr_add_steps_operator.py @@ -13,10 +13,16 @@ # limitations under the License. import unittest +from datetime import timedelta + from mock import MagicMock, patch -from airflow import configuration +from airflow import DAG, configuration from airflow.contrib.operators.emr_add_steps_operator import EmrAddStepsOperator +from airflow.models import TaskInstance +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2017, 1, 1) ADD_STEPS_SUCCESS_RETURN = { 'ResponseMetadata': { @@ -27,30 +33,71 @@ ADD_STEPS_SUCCESS_RETURN = { class TestEmrAddStepsOperator(unittest.TestCase): + # When + _config = [{ + 'Name': 'test_step', + 'ActionOnFailure': 'CONTINUE', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + '/usr/lib/spark/bin/run-example', + '{{ macros.ds_add(ds, -1) }}', + '{{ ds }}' + ] + } + }] + def setUp(self): configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } # Mock out the emr_client (moto has incorrect response) - mock_emr_client = MagicMock() - mock_emr_client.add_job_flow_steps.return_value = ADD_STEPS_SUCCESS_RETURN + self.emr_client_mock = MagicMock() + self.operator = EmrAddStepsOperator( + task_id='test_task', + job_flow_id='j-8989898989', + aws_conn_id='aws_default', + steps=self._config, + dag=DAG('test_dag_id', default_args=args) + ) - mock_emr_session = MagicMock() - mock_emr_session.client.return_value = mock_emr_client + def test_init(self): + self.assertEqual(self.operator.job_flow_id, 'j-8989898989') + self.assertEqual(self.operator.aws_conn_id, 'aws_default') - # Mock out the emr_client creator - self.boto3_session_mock = MagicMock(return_value=mock_emr_session) + def test_render_template(self): + ti = TaskInstance(self.operator, DEFAULT_DATE) + ti.render_templates() + expected_args = [{ + 'Name': 'test_step', + 'ActionOnFailure': 'CONTINUE', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + '/usr/lib/spark/bin/run-example', + (DEFAULT_DATE - timedelta(days=1)).strftime("%Y-%m-%d"), + DEFAULT_DATE.strftime("%Y-%m-%d"), + ] + } + }] - def test_execute_adds_steps_to_the_job_flow_and_returns_step_ids(self): - with patch('boto3.session.Session', self.boto3_session_mock): + self.assertListEqual(self.operator.steps, expected_args) - operator = EmrAddStepsOperator( - task_id='test_task', - job_flow_id='j-8989898989', - aws_conn_id='aws_default' - ) + def test_execute_returns_step_id(self): + self.emr_client_mock.add_job_flow_steps.return_value = ADD_STEPS_SUCCESS_RETURN + + # Mock out the emr_client creator + emr_session_mock = MagicMock() + emr_session_mock.client.return_value = self.emr_client_mock + self.boto3_session_mock = MagicMock(return_value=emr_session_mock) + + with patch('boto3.session.Session', self.boto3_session_mock): + self.assertEqual(self.operator.execute(None), ['s-2LH3R5GW3A53T']) - self.assertEqual(operator.execute(None), ['s-2LH3R5GW3A53T']) if __name__ == '__main__': unittest.main() http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44551e24/tests/contrib/operators/test_emr_create_job_flow_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_emr_create_job_flow_operator.py b/tests/contrib/operators/test_emr_create_job_flow_operator.py index 9120aea..982a3ed 100644 --- a/tests/contrib/operators/test_emr_create_job_flow_operator.py +++ b/tests/contrib/operators/test_emr_create_job_flow_operator.py @@ -14,10 +14,16 @@ # import unittest +from datetime import timedelta + from mock import MagicMock, patch -from airflow import configuration +from airflow import DAG, configuration from airflow.contrib.operators.emr_create_job_flow_operator import EmrCreateJobFlowOperator +from airflow.models import TaskInstance +from airflow.utils import timezone + +DEFAULT_DATE = timezone.datetime(2017, 1, 1) RUN_JOB_FLOW_SUCCESS_RETURN = { 'ResponseMetadata': { @@ -26,31 +32,81 @@ RUN_JOB_FLOW_SUCCESS_RETURN = { 'JobFlowId': 'j-8989898989' } + class TestEmrCreateJobFlowOperator(unittest.TestCase): + # When + _config = { + 'Name': 'test_job_flow', + 'ReleaseLabel': '5.11.0', + 'Steps': [{ + 'Name': 'test_step', + 'ActionOnFailure': 'CONTINUE', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + '/usr/lib/spark/bin/run-example', + '{{ macros.ds_add(ds, -1) }}', + '{{ ds }}' + ] + } + }] + } + def setUp(self): configuration.load_test_config() + args = { + 'owner': 'airflow', + 'start_date': DEFAULT_DATE + } # Mock out the emr_client (moto has incorrect response) - mock_emr_client = MagicMock() - mock_emr_client.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN + self.emr_client_mock = MagicMock() + self.operator = EmrCreateJobFlowOperator( + task_id='test_task', + aws_conn_id='aws_default', + emr_conn_id='emr_default', + job_flow_overrides=self._config, + dag=DAG('test_dag_id', default_args=args) + ) - mock_emr_session = MagicMock() - mock_emr_session.client.return_value = mock_emr_client + def test_init(self): + self.assertEqual(self.operator.aws_conn_id, 'aws_default') + self.assertEqual(self.operator.emr_conn_id, 'emr_default') - # Mock out the emr_client creator - self.boto3_session_mock = MagicMock(return_value=mock_emr_session) + def test_render_template(self): + ti = TaskInstance(self.operator, DEFAULT_DATE) + ti.render_templates() + expected_args = { + 'Name': 'test_job_flow', + 'ReleaseLabel': '5.11.0', + 'Steps': [{ + 'Name': 'test_step', + 'ActionOnFailure': 'CONTINUE', + 'HadoopJarStep': { + 'Jar': 'command-runner.jar', + 'Args': [ + '/usr/lib/spark/bin/run-example', + (DEFAULT_DATE - timedelta(days=1)).strftime("%Y-%m-%d"), + DEFAULT_DATE.strftime("%Y-%m-%d"), + ] + } + }] + } - def test_execute_uses_the_emr_config_to_create_a_cluster_and_returns_job_id(self): - with patch('boto3.session.Session', self.boto3_session_mock): + self.assertDictEqual(self.operator.job_flow_overrides, expected_args) + + def test_execute_returns_job_id(self): + self.emr_client_mock.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN - operator = EmrCreateJobFlowOperator( - task_id='test_task', - aws_conn_id='aws_default', - emr_conn_id='emr_default' - ) + # Mock out the emr_client creator + emr_session_mock = MagicMock() + emr_session_mock.client.return_value = self.emr_client_mock + self.boto3_session_mock = MagicMock(return_value=emr_session_mock) + + with patch('boto3.session.Session', self.boto3_session_mock): + self.assertEqual(self.operator.execute(None), 'j-8989898989') - self.assertEqual(operator.execute(None), 'j-8989898989') if __name__ == '__main__': unittest.main()