This is an automated email from the ASF dual-hosted git repository.
jedcunningham pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new 3849ebb8d2 Fix auto upstream dep when expanding non-templated field
(#23771)
3849ebb8d2 is described below
commit 3849ebb8d22bbc229d464c4171c9b5ff960cd089
Author: Jed Cunningham <[email protected]>
AuthorDate: Wed May 18 13:43:16 2022 -0600
Fix auto upstream dep when expanding non-templated field (#23771)
If you tried to expand via xcom into a non-templated field without
explicitly setting the upstream task dependency, the scheduler would
crash because the upstream task dependency wasn't being set
automatically. It was being set only for templated fields, but now we do
it for both.
---
airflow/models/mappedoperator.py | 3 +--
tests/models/test_taskinstance.py | 20 ++++++++++++++++++++
2 files changed, 21 insertions(+), 2 deletions(-)
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index b63e26ec9e..c522cefb2c 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -300,8 +300,7 @@ class MappedOperator(AbstractOperator):
if self.dag:
self.dag.add_task(self)
for k, v in self.mapped_kwargs.items():
- if k in self.template_fields:
- XComArg.apply_upstream_relationship(self, v)
+ XComArg.apply_upstream_relationship(self, v)
for k, v in self.partial_kwargs.items():
if k in self.template_fields:
XComArg.apply_upstream_relationship(self, v)
diff --git a/tests/models/test_taskinstance.py
b/tests/models/test_taskinstance.py
index 220c04d7cc..975d44cf92 100644
--- a/tests/models/test_taskinstance.py
+++ b/tests/models/test_taskinstance.py
@@ -2835,3 +2835,23 @@ def
test_ti_mapped_depends_on_mapped_xcom_arg_XXX(dag_maker, session):
ti.refresh_from_task(dag.get_task("add_one"))
with pytest.raises(XComForMappingNotPushed):
ti.run()
+
+
+def test_expand_non_templated_field(dag_maker, session):
+ """Test expand on non-templated fields sets upstream deps properly."""
+
+ class SimpleBashOperator(BashOperator):
+ template_fields = ()
+
+ with dag_maker(dag_id="product_same_types", session=session) as dag:
+
+ @dag.task
+ def get_extra_env():
+ return [{"foo": "bar"}, {"foo": "biz"}]
+
+ SimpleBashOperator.partial(task_id="echo", bash_command="echo
$FOO").expand(env=get_extra_env())
+
+ dag_maker.create_dagrun()
+
+ echo_task = dag.get_task("echo")
+ assert "get_extra_env" in echo_task.upstream_task_ids