ashb commented on a change in pull request #4743: [AIRFLOW-3871] render 
Operators template fields recursively
URL: https://github.com/apache/airflow/pull/4743#discussion_r310050848
 
 

 ##########
 File path: tests/operators/test_python_operator.py
 ##########
 @@ -242,6 +249,510 @@ def test_echo_env_variables(self):
                            )
         t.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
+    def test_template_field_value_rendering(self):
+        """Test PythonOperator template field with a simple templated string"""
+        recorded_calls = []
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'a_templated_string': "dag {{dag.dag_id}} ran on {{ds}}."
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        self.assertEqual(1, len(recorded_calls))
+        self.assertTrue('templates_dict' in recorded_calls[0].kwargs)
+        self.assertDictEqual(
+            recorded_calls[0].kwargs['templates_dict'],
+            {'a_templated_string': "dag {} ran on {}.".format(self.dag.dag_id, 
DEFAULT_DATE.date())}
+        )
+
+    def test_template_field_object_rendering(self):
+        """Test PythonOperator template field with a nested templated string 
attribute"""
+        recorded_calls = []
+        ClassWithCustomAttributes.template_fields = ['templated_string_attr']
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'a_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="dag {{dag.dag_id}} ran on {{ds}}."
+                )
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        self.assertEqual(1, len(recorded_calls))
+        self.assertTrue('templates_dict' in recorded_calls[0].kwargs)
+        self.assertDictEqual(
+            recorded_calls[0].kwargs['templates_dict'],
+            {
+                'a_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="dag {} ran on 
{}.".format(self.dag.dag_id, DEFAULT_DATE.date())
+                )
+            }
+        )
+
+    def test_multiple_template_fields_rendering(self):
+        """Test PythonOperator template field with multiple templated and non 
templated values"""
+        recorded_calls = []
+        ClassWithCustomAttributes.template_fields = ['templated_string_attr']
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'an_int': 4,
+                'a_date': date(2019, 2, 18),
+                'a_string': "this is a static string",
+                'a_templated_string': "this is a templated string for dag 
{{dag.dag_id}}",
+                'an_object': ClassWithCustomAttributes(
+                    templated_string_attr="static string here",
+                    static_string_attr='static string',
+                    int_attr=5,
+                    date_attr=date(2019, 2, 19)
+                ),
+                'a_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="dag {{dag.dag_id}}",
+                    static_string_attr="static_string",
+                    int_attr=6,
+                    date_attr=date(2019, 2, 20)
+                )
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        self.assertEqual(1, len(recorded_calls))
+        self.assertTrue('templates_dict' in recorded_calls[0].kwargs)
+        self.assertDictEqual(
+            recorded_calls[0].kwargs['templates_dict'],
+            {
+                'an_int': 4,
+                'a_date': date(2019, 2, 18),
+                'a_string': "this is a static string",
+                'a_templated_string': "this is a templated string for dag 
{}".format(self.dag.dag_id),
+                'an_object': ClassWithCustomAttributes(
+                    templated_string_attr="static string here",
+                    static_string_attr='static string',
+                    int_attr=5,
+                    date_attr=date(2019, 2, 19)
+                ),
+                'a_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="dag {}".format(self.dag.dag_id),
+                    static_string_attr="static_string",
+                    int_attr=6,
+                    date_attr=date(2019, 2, 20)
+                )
+            }
+        )
+
+    def 
test_inner_attributes_are_rendered_when_marked_as_template_fields_only(self):
+        recorded_calls = []
+        ClassWithCustomAttributes.template_fields = ['templated_string_attr']
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'a_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="dag {{dag.dag_id}} ran on {{ds}}.",
+                    static_string_attr="dag {{dag.dag_id}} ran on {{ds}}."
+                )
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        self.assertEqual(1, len(recorded_calls))
+        self.assertTrue('templates_dict' in recorded_calls[0].kwargs)
+        self.assertDictEqual(
+            recorded_calls[0].kwargs['templates_dict'],
+            {
+                'a_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="dag {} ran on 
{}.".format(self.dag.dag_id, DEFAULT_DATE.date()),
+                    static_string_attr="dag {{dag.dag_id}} ran on {{ds}}."
+                )
+            }
+        )
+
+    def 
test_inner_attributes_are_not_rendered_when_object_has_no_template_fields(self):
+        recorded_calls = []
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'a_templated_string': "dag {{dag.dag_id}} ran on {{ds}}.",
+                'a_non_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="dag {{dag.dag_id}} ran on {{ds}}."
+                )
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        self.assertEqual(1, len(recorded_calls))
+        self.assertTrue('templates_dict' in recorded_calls[0].kwargs)
+        self.assertDictEqual(
+            recorded_calls[0].kwargs['templates_dict'],
+            {
+                'a_templated_string': "dag {} ran on 
{}.".format(self.dag.dag_id, DEFAULT_DATE.date()),
+                'a_non_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="dag {{dag.dag_id}} ran on {{ds}}."
+                )
+            }
+        )
+
+    def test_template_can_be_rendered_recursively_on_deep_attributes(self):
+        recorded_calls = []
+        ClassWithCustomAttributes.template_fields = ['templated_string_attr', 
'ref']
+
+        object1 = ClassWithCustomAttributes(
+            templated_string_attr="object 1 on dag {{dag.dag_id}}",
+            ref=None
+        )
+        object2 = ClassWithCustomAttributes(
+            templated_string_attr="object 2 on dag {{dag.dag_id}}",
+            ref=object1
+        )
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'a_templated_object': object2
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        self.assertEqual(1, len(recorded_calls))
+        self.assertTrue('templates_dict' in recorded_calls[0].kwargs)
+        self.assertDictEqual(
+            recorded_calls[0].kwargs['templates_dict'],
+            {
+                'a_templated_object': ClassWithCustomAttributes(
+                    templated_string_attr="object 2 on dag 
{}".format(self.dag.dag_id),
+                    ref=ClassWithCustomAttributes(
+                        templated_string_attr="object 1 on dag 
{}".format(self.dag.dag_id),
+                        ref=None
+                    )
+                )
+            }
+        )
+
+    def test_nested_template_fields_can_be_defined_at_the_class_level(self):
+        recorded_calls = []
+        ClassWithCustomAttributes.template_fields = ['templated_string_attr']
+
+        object1 = ClassWithCustomAttributes(
+            templated_string_attr="object 1 on dag {{dag.dag_id}}"
+        )
+        object2 = ClassWithCustomAttributes(
+            templated_string_attr="object 2 on dag {{dag.dag_id}}"
+        )
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'an_object': object1,
+                'another_object': object2
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        self.assertEqual(1, len(recorded_calls))
+        self.assertTrue('templates_dict' in recorded_calls[0].kwargs)
+        self.assertDictEqual(
+            recorded_calls[0].kwargs['templates_dict'],
+            {
+                'an_object': ClassWithCustomAttributes(
+                    templated_string_attr="object 1 on dag 
{}".format(self.dag.dag_id)
+                ),
+                'another_object': ClassWithCustomAttributes(
+                    templated_string_attr="object 2 on dag 
{}".format(self.dag.dag_id)
+                )
+            }
+        )
+
+    def test_nested_template_fields_can_be_defined_at_the_instance_level(self):
+        recorded_calls = []
+
+        object1 = ClassWithCustomAttributes(
+            templated_string_attr="object 1 on dag {{dag.dag_id}}",
+            template_fields=['templated_string_attr']
+        )
+        object2 = ClassWithCustomAttributes(
+            templated_string_attr="object 2 on dag {{dag.dag_id}}",
+        )
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'an_object': object1,
+                'another_object': object2
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
+
+        self.assertEqual(1, len(recorded_calls))
+        self.assertTrue('templates_dict' in recorded_calls[0].kwargs)
+        self.assertDictEqual(
+            recorded_calls[0].kwargs['templates_dict'],
+            {
+                'an_object': ClassWithCustomAttributes(
+                    templated_string_attr="object 1 on dag 
{}".format(self.dag.dag_id),
+                    template_fields=['templated_string_attr']
+                ),
+                'another_object': ClassWithCustomAttributes(
+                    templated_string_attr="object 2 on dag {{dag.dag_id}}"
+                )
+            }
+        )
+
+    def 
test_template_are_not_rendered_recursively_when_reference_is_not_a_template_field(self):
+        recorded_calls = []
+        ClassWithCustomAttributes.template_fields = ['templated_string_attr']
+
+        object1 = ClassWithCustomAttributes(
+            templated_string_attr="object 1 on dag {{dag.dag_id}}",
+            ref=None
+        )
+        object2 = ClassWithCustomAttributes(
+            templated_string_attr="object 2 on dag {{dag.dag_id}}",
+            ref=object1
+        )
+
+        task = PythonOperator(
+            task_id='python_operator',
+            python_callable=build_recording_function(recorded_calls),
+            provide_context=True,
+            templates_dict={
+                'a_templated_object': object2
+            },
+            dag=self.dag)
+
+        self.dag.create_dagrun(
+            run_id='manual__' + DEFAULT_DATE.isoformat(),
+            execution_date=DEFAULT_DATE,
+            start_date=DEFAULT_DATE,
+            state=State.RUNNING
+        )
+        task.run(start_date=DEFAULT_DATE, end_date=DEFAULT_DATE)
 
 Review comment:
   Once the template refactor PR is merged we should change all of these to not 
run the task, but to call the `render_template_fields` directly instead.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to