Repository: incubator-airflow
Updated Branches:
  refs/heads/master fd6772116 -> 44551e249


[AIRFLOW-713] Jinjafy {EmrCreateJobFlow,EmrAddSteps}Operator attributes

To dynamically templat the fields of the Emr Operators, we need
to pass the fields to jinja

Closes #3016 from Swalloow/emr-jinjafied


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/44551e24
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/44551e24
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/44551e24

Branch: refs/heads/master
Commit: 44551e249fd338f3c4d24ef95d4b9c021f3b0688
Parents: fd67721
Author: Swalloow <swalloow...@gmail.com>
Authored: Fri Feb 9 10:20:02 2018 +0100
Committer: Fokko Driesprong <fokkodriespr...@godatadriven.com>
Committed: Fri Feb 9 10:20:06 2018 +0100

----------------------------------------------------------------------
 .../operators/emr_create_job_flow_operator.py   |  2 +-
 .../operators/test_emr_add_steps_operator.py    | 77 ++++++++++++++----
 .../test_emr_create_job_flow_operator.py        | 86 ++++++++++++++++----
 3 files changed, 134 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44551e24/airflow/contrib/operators/emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/emr_create_job_flow_operator.py 
b/airflow/contrib/operators/emr_create_job_flow_operator.py
index 2544adf..8111800 100644
--- a/airflow/contrib/operators/emr_create_job_flow_operator.py
+++ b/airflow/contrib/operators/emr_create_job_flow_operator.py
@@ -29,7 +29,7 @@ class EmrCreateJobFlowOperator(BaseOperator):
     :param job_flow_overrides: boto3 style arguments to override 
emr_connection extra
     :type steps: dict
     """
-    template_fields = []
+    template_fields = ['job_flow_overrides']
     template_ext = ()
     ui_color = '#f9c915'
 

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44551e24/tests/contrib/operators/test_emr_add_steps_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_emr_add_steps_operator.py 
b/tests/contrib/operators/test_emr_add_steps_operator.py
index 141e986..e5ac9fe 100644
--- a/tests/contrib/operators/test_emr_add_steps_operator.py
+++ b/tests/contrib/operators/test_emr_add_steps_operator.py
@@ -13,10 +13,16 @@
 # limitations under the License.
 
 import unittest
+from datetime import timedelta
+
 from mock import MagicMock, patch
 
-from airflow import configuration
+from airflow import DAG, configuration
 from airflow.contrib.operators.emr_add_steps_operator import 
EmrAddStepsOperator
+from airflow.models import TaskInstance
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 ADD_STEPS_SUCCESS_RETURN = {
     'ResponseMetadata': {
@@ -27,30 +33,71 @@ ADD_STEPS_SUCCESS_RETURN = {
 
 
 class TestEmrAddStepsOperator(unittest.TestCase):
+    # When
+    _config = [{
+        'Name': 'test_step',
+        'ActionOnFailure': 'CONTINUE',
+        'HadoopJarStep': {
+            'Jar': 'command-runner.jar',
+            'Args': [
+                '/usr/lib/spark/bin/run-example',
+                '{{ macros.ds_add(ds, -1) }}',
+                '{{ ds }}'
+            ]
+        }
+    }]
+
     def setUp(self):
         configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
 
         # Mock out the emr_client (moto has incorrect response)
-        mock_emr_client = MagicMock()
-        mock_emr_client.add_job_flow_steps.return_value = 
ADD_STEPS_SUCCESS_RETURN
+        self.emr_client_mock = MagicMock()
+        self.operator = EmrAddStepsOperator(
+            task_id='test_task',
+            job_flow_id='j-8989898989',
+            aws_conn_id='aws_default',
+            steps=self._config,
+            dag=DAG('test_dag_id', default_args=args)
+        )
 
-        mock_emr_session = MagicMock()
-        mock_emr_session.client.return_value = mock_emr_client
+    def test_init(self):
+        self.assertEqual(self.operator.job_flow_id, 'j-8989898989')
+        self.assertEqual(self.operator.aws_conn_id, 'aws_default')
 
-        # Mock out the emr_client creator
-        self.boto3_session_mock = MagicMock(return_value=mock_emr_session)
+    def test_render_template(self):
+        ti = TaskInstance(self.operator, DEFAULT_DATE)
+        ti.render_templates()
 
+        expected_args = [{
+            'Name': 'test_step',
+            'ActionOnFailure': 'CONTINUE',
+            'HadoopJarStep': {
+                'Jar': 'command-runner.jar',
+                'Args': [
+                    '/usr/lib/spark/bin/run-example',
+                    (DEFAULT_DATE - timedelta(days=1)).strftime("%Y-%m-%d"),
+                    DEFAULT_DATE.strftime("%Y-%m-%d"),
+                ]
+            }
+        }]
 
-    def test_execute_adds_steps_to_the_job_flow_and_returns_step_ids(self):
-        with patch('boto3.session.Session', self.boto3_session_mock):
+        self.assertListEqual(self.operator.steps, expected_args)
 
-            operator = EmrAddStepsOperator(
-                task_id='test_task',
-                job_flow_id='j-8989898989',
-                aws_conn_id='aws_default'
-            )
+    def test_execute_returns_step_id(self):
+        self.emr_client_mock.add_job_flow_steps.return_value = 
ADD_STEPS_SUCCESS_RETURN
+
+        # Mock out the emr_client creator
+        emr_session_mock = MagicMock()
+        emr_session_mock.client.return_value = self.emr_client_mock
+        self.boto3_session_mock = MagicMock(return_value=emr_session_mock)
+
+        with patch('boto3.session.Session', self.boto3_session_mock):
+            self.assertEqual(self.operator.execute(None), ['s-2LH3R5GW3A53T'])
 
-            self.assertEqual(operator.execute(None), ['s-2LH3R5GW3A53T'])
 
 if __name__ == '__main__':
     unittest.main()

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/44551e24/tests/contrib/operators/test_emr_create_job_flow_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_emr_create_job_flow_operator.py 
b/tests/contrib/operators/test_emr_create_job_flow_operator.py
index 9120aea..982a3ed 100644
--- a/tests/contrib/operators/test_emr_create_job_flow_operator.py
+++ b/tests/contrib/operators/test_emr_create_job_flow_operator.py
@@ -14,10 +14,16 @@
 #
 
 import unittest
+from datetime import timedelta
+
 from mock import MagicMock, patch
 
-from airflow import configuration
+from airflow import DAG, configuration
 from airflow.contrib.operators.emr_create_job_flow_operator import 
EmrCreateJobFlowOperator
+from airflow.models import TaskInstance
+from airflow.utils import timezone
+
+DEFAULT_DATE = timezone.datetime(2017, 1, 1)
 
 RUN_JOB_FLOW_SUCCESS_RETURN = {
     'ResponseMetadata': {
@@ -26,31 +32,81 @@ RUN_JOB_FLOW_SUCCESS_RETURN = {
     'JobFlowId': 'j-8989898989'
 }
 
+
 class TestEmrCreateJobFlowOperator(unittest.TestCase):
+    # When
+    _config = {
+        'Name': 'test_job_flow',
+        'ReleaseLabel': '5.11.0',
+        'Steps': [{
+            'Name': 'test_step',
+            'ActionOnFailure': 'CONTINUE',
+            'HadoopJarStep': {
+                'Jar': 'command-runner.jar',
+                'Args': [
+                    '/usr/lib/spark/bin/run-example',
+                    '{{ macros.ds_add(ds, -1) }}',
+                    '{{ ds }}'
+                ]
+            }
+        }]
+    }
+
     def setUp(self):
         configuration.load_test_config()
+        args = {
+            'owner': 'airflow',
+            'start_date': DEFAULT_DATE
+        }
 
         # Mock out the emr_client (moto has incorrect response)
-        mock_emr_client = MagicMock()
-        mock_emr_client.run_job_flow.return_value = RUN_JOB_FLOW_SUCCESS_RETURN
+        self.emr_client_mock = MagicMock()
+        self.operator = EmrCreateJobFlowOperator(
+            task_id='test_task',
+            aws_conn_id='aws_default',
+            emr_conn_id='emr_default',
+            job_flow_overrides=self._config,
+            dag=DAG('test_dag_id', default_args=args)
+        )
 
-        mock_emr_session = MagicMock()
-        mock_emr_session.client.return_value = mock_emr_client
+    def test_init(self):
+        self.assertEqual(self.operator.aws_conn_id, 'aws_default')
+        self.assertEqual(self.operator.emr_conn_id, 'emr_default')
 
-        # Mock out the emr_client creator
-        self.boto3_session_mock = MagicMock(return_value=mock_emr_session)
+    def test_render_template(self):
+        ti = TaskInstance(self.operator, DEFAULT_DATE)
+        ti.render_templates()
 
+        expected_args = {
+            'Name': 'test_job_flow',
+            'ReleaseLabel': '5.11.0',
+            'Steps': [{
+                'Name': 'test_step',
+                'ActionOnFailure': 'CONTINUE',
+                'HadoopJarStep': {
+                    'Jar': 'command-runner.jar',
+                    'Args': [
+                        '/usr/lib/spark/bin/run-example',
+                        (DEFAULT_DATE - 
timedelta(days=1)).strftime("%Y-%m-%d"),
+                        DEFAULT_DATE.strftime("%Y-%m-%d"),
+                    ]
+                }
+            }]
+        }
 
-    def 
test_execute_uses_the_emr_config_to_create_a_cluster_and_returns_job_id(self):
-        with patch('boto3.session.Session', self.boto3_session_mock):
+        self.assertDictEqual(self.operator.job_flow_overrides, expected_args)
+
+    def test_execute_returns_job_id(self):
+        self.emr_client_mock.run_job_flow.return_value = 
RUN_JOB_FLOW_SUCCESS_RETURN
 
-            operator = EmrCreateJobFlowOperator(
-                task_id='test_task',
-                aws_conn_id='aws_default',
-                emr_conn_id='emr_default'
-            )
+        # Mock out the emr_client creator
+        emr_session_mock = MagicMock()
+        emr_session_mock.client.return_value = self.emr_client_mock
+        self.boto3_session_mock = MagicMock(return_value=emr_session_mock)
+
+        with patch('boto3.session.Session', self.boto3_session_mock):
+            self.assertEqual(self.operator.execute(None), 'j-8989898989')
 
-            self.assertEqual(operator.execute(None), 'j-8989898989')
 
 if __name__ == '__main__':
     unittest.main()

Reply via email to