This is an automated email from the ASF dual-hosted git repository.
potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new addbd587e2 Allow email field to be templated (#35546)
addbd587e2 is described below
commit addbd587e218a3ace09b08852094b336ecb33e9b
Author: Gevorg Davoian <[email protected]>
AuthorDate: Thu Nov 9 20:09:42 2023 +0200
Allow email field to be templated (#35546)
---
airflow/serialization/serialized_objects.py | 4 +++-
tests/serialization/test_dag_serialization.py | 29 +++++++++++++++++++--------
2 files changed, 24 insertions(+), 9 deletions(-)
diff --git a/airflow/serialization/serialized_objects.py
b/airflow/serialization/serialized_objects.py
index ef35d843b4..d2a72cb1ca 100644
--- a/airflow/serialization/serialized_objects.py
+++ b/airflow/serialization/serialized_objects.py
@@ -878,10 +878,12 @@ class SerializedBaseOperator(BaseOperator,
BaseSerialization):
# If not, store them as strings
# And raise an exception if the field is not templateable
forbidden_fields =
set(inspect.signature(BaseOperator.__init__).parameters.keys())
+ # Though allow some of the BaseOperator fields to be templated anyway
+ forbidden_fields.difference_update({"email"})
if op.template_fields:
for template_field in op.template_fields:
if template_field in forbidden_fields:
- raise AirflowException(f"Cannot template BaseOperator
fields: {template_field}")
+ raise AirflowException(f"Cannot template BaseOperator
field: {template_field!r}")
value = getattr(op, template_field, None)
if not cls._is_excluded(value, template_field, op):
serialize_op[template_field] =
serialize_template_field(value)
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index 4aecebcd1d..a9c63c80fe 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2035,20 +2035,33 @@ class TestStringifiedDAGs:
assert param.description == "hello"
assert param.schema == {"type": "string"}
- def test_not_templateable_fields_in_serialized_dag(
- self,
- ):
+ @pytest.mark.db_test
+ def test_not_templateable_fields_in_serialized_dag(self):
"""
- Test that when we use not templateable fields, an Airflow exception
is raised.
+ Test that when we use not templateable fields, an Airflow exception is
raised.
"""
class TestOperator(BaseOperator):
- template_fields = ("execution_timeout",)
+ template_fields = (
+ "email", # templateable
+ "execution_timeout", # not templateable
+ )
+
+ def execute(self, context: Context):
+ pass
+
+ dag = DAG(dag_id="test_dag", start_date=datetime(2023, 11, 9))
- dag = DAG("test_not_templateable_fields", start_date=datetime(2019, 8,
1))
with dag:
- TestOperator(task_id="test",
execution_timeout=timedelta(seconds=10))
- with pytest.raises(AirflowException, match="Cannot template
BaseOperator fields: execution_timeout"):
+ task = TestOperator(
+ task_id="test_task",
+ email="{{ ','.join(test_email_list) }}",
+ execution_timeout=timedelta(seconds=10),
+ )
+ task.render_template_fields(context={"test_email_list":
["[email protected]", "[email protected]"]})
+ assert task.email == "[email protected],[email protected]"
+
+ with pytest.raises(AirflowException, match="Cannot template
BaseOperator field: 'execution_timeout'"):
SerializedDAG.to_dict(dag)