This is an automated email from the ASF dual-hosted git repository.
uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git
The following commit(s) were added to refs/heads/main by this push:
new d0d7693a3b Disable nested task mapping for now (#27681)
d0d7693a3b is described below
commit d0d7693a3bfa6653077e4c746562f849a337d616
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Nov 16 08:21:20 2022 +0800
Disable nested task mapping for now (#27681)
---
airflow/models/mappedoperator.py | 3 +++
tests/models/test_dagrun.py | 18 +++++++++++-------
tests/serialization/test_dag_serialization.py | 6 ++++--
3 files changed, 18 insertions(+), 9 deletions(-)
diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 26ca9ff4bb..ad99d8756a 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -309,6 +309,9 @@ class MappedOperator(AbstractOperator):
def __attrs_post_init__(self):
from airflow.models.xcom_arg import XComArg
+ if next(self.iter_mapped_task_groups(), None) is not None:
+ raise NotImplementedError("operator expansion in an expanded task
group is not yet supported")
+
if self.task_group:
self.task_group.add(self)
if self.dag:
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 6295dbcf90..7c2a8a8866 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -1700,11 +1700,15 @@ def test_mapped_task_group_expands_at_create(dag_maker,
session):
# Normal operator in mapped task group, expands to 2 tis.
MockOperator(task_id="t1")
# Mapped operator expands *again* against mapped task group
arguments to 4 tis.
- MockOperator.partial(task_id="t2").expand(arg1=literal)
+ with pytest.raises(NotImplementedError) as ctx:
+ MockOperator.partial(task_id="t2").expand(arg1=literal)
+ assert str(ctx.value) == "operator expansion in an expanded task
group is not yet supported"
# Normal operator referencing mapped task group arguments does not
further expand, only 2 tis.
MockOperator(task_id="t3", arg1=x)
# It can expand *again* (since each item in x is a list) but this
is not done at parse time.
- MockOperator.partial(task_id="t4").expand(arg1=x)
+ with pytest.raises(NotImplementedError) as ctx:
+ MockOperator.partial(task_id="t4").expand(arg1=x)
+ assert str(ctx.value) == "operator expansion in an expanded task
group is not yet supported"
tg.expand(x=literal)
@@ -1717,13 +1721,13 @@ def test_mapped_task_group_expands_at_create(dag_maker,
session):
assert query.all() == [
("tg.t1", 0, None),
("tg.t1", 1, None),
- ("tg.t2", 0, None),
- ("tg.t2", 1, None),
- ("tg.t2", 2, None),
- ("tg.t2", 3, None),
+ # ("tg.t2", 0, None),
+ # ("tg.t2", 1, None),
+ # ("tg.t2", 2, None),
+ # ("tg.t2", 3, None),
("tg.t3", 0, None),
("tg.t3", 1, None),
- ("tg.t4", -1, None),
+ # ("tg.t4", -1, None),
]
diff --git a/tests/serialization/test_dag_serialization.py
b/tests/serialization/test_dag_serialization.py
index da61564976..994ba12b32 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2296,7 +2296,9 @@ def test_mapped_task_group_serde():
@task_group
def tg(a: str) -> None:
BaseOperator(task_id="op1")
- BashOperator.partial(task_id="op2").expand(bash_command=["ls", a])
+ with pytest.raises(NotImplementedError) as ctx:
+ BashOperator.partial(task_id="op2").expand(bash_command=["ls",
a])
+ assert str(ctx.value) == "operator expansion in an expanded task
group is not yet supported"
tg.expand(a=[".", ".."])
@@ -2307,7 +2309,7 @@ def test_mapped_task_group_serde():
"_group_id": "tg",
"children": {
"tg.op1": ("operator", "tg.op1"),
- "tg.op2": ("operator", "tg.op2"),
+ # "tg.op2": ("operator", "tg.op2"),
},
"downstream_group_ids": [],
"downstream_task_ids": [],