This is an automated email from the ASF dual-hosted git repository.

uranusjr pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new d0d7693a3b Disable nested task mapping for now (#27681)
d0d7693a3b is described below

commit d0d7693a3bfa6653077e4c746562f849a337d616
Author: Tzu-ping Chung <[email protected]>
AuthorDate: Wed Nov 16 08:21:20 2022 +0800

    Disable nested task mapping for now (#27681)
---
 airflow/models/mappedoperator.py              |  3 +++
 tests/models/test_dagrun.py                   | 18 +++++++++++-------
 tests/serialization/test_dag_serialization.py |  6 ++++--
 3 files changed, 18 insertions(+), 9 deletions(-)

diff --git a/airflow/models/mappedoperator.py b/airflow/models/mappedoperator.py
index 26ca9ff4bb..ad99d8756a 100644
--- a/airflow/models/mappedoperator.py
+++ b/airflow/models/mappedoperator.py
@@ -309,6 +309,9 @@ class MappedOperator(AbstractOperator):
     def __attrs_post_init__(self):
         from airflow.models.xcom_arg import XComArg
 
+        if next(self.iter_mapped_task_groups(), None) is not None:
+            raise NotImplementedError("operator expansion in an expanded task 
group is not yet supported")
+
         if self.task_group:
             self.task_group.add(self)
         if self.dag:
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 6295dbcf90..7c2a8a8866 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -1700,11 +1700,15 @@ def test_mapped_task_group_expands_at_create(dag_maker, 
session):
             # Normal operator in mapped task group, expands to 2 tis.
             MockOperator(task_id="t1")
             # Mapped operator expands *again* against mapped task group 
arguments to 4 tis.
-            MockOperator.partial(task_id="t2").expand(arg1=literal)
+            with pytest.raises(NotImplementedError) as ctx:
+                MockOperator.partial(task_id="t2").expand(arg1=literal)
+            assert str(ctx.value) == "operator expansion in an expanded task 
group is not yet supported"
             # Normal operator referencing mapped task group arguments does not 
further expand, only 2 tis.
             MockOperator(task_id="t3", arg1=x)
             # It can expand *again* (since each item in x is a list) but this 
is not done at parse time.
-            MockOperator.partial(task_id="t4").expand(arg1=x)
+            with pytest.raises(NotImplementedError) as ctx:
+                MockOperator.partial(task_id="t4").expand(arg1=x)
+            assert str(ctx.value) == "operator expansion in an expanded task 
group is not yet supported"
 
         tg.expand(x=literal)
 
@@ -1717,13 +1721,13 @@ def test_mapped_task_group_expands_at_create(dag_maker, 
session):
     assert query.all() == [
         ("tg.t1", 0, None),
         ("tg.t1", 1, None),
-        ("tg.t2", 0, None),
-        ("tg.t2", 1, None),
-        ("tg.t2", 2, None),
-        ("tg.t2", 3, None),
+        # ("tg.t2", 0, None),
+        # ("tg.t2", 1, None),
+        # ("tg.t2", 2, None),
+        # ("tg.t2", 3, None),
         ("tg.t3", 0, None),
         ("tg.t3", 1, None),
-        ("tg.t4", -1, None),
+        # ("tg.t4", -1, None),
     ]
 
 
diff --git a/tests/serialization/test_dag_serialization.py 
b/tests/serialization/test_dag_serialization.py
index da61564976..994ba12b32 100644
--- a/tests/serialization/test_dag_serialization.py
+++ b/tests/serialization/test_dag_serialization.py
@@ -2296,7 +2296,9 @@ def test_mapped_task_group_serde():
         @task_group
         def tg(a: str) -> None:
             BaseOperator(task_id="op1")
-            BashOperator.partial(task_id="op2").expand(bash_command=["ls", a])
+            with pytest.raises(NotImplementedError) as ctx:
+                BashOperator.partial(task_id="op2").expand(bash_command=["ls", 
a])
+            assert str(ctx.value) == "operator expansion in an expanded task 
group is not yet supported"
 
         tg.expand(a=[".", ".."])
 
@@ -2307,7 +2309,7 @@ def test_mapped_task_group_serde():
             "_group_id": "tg",
             "children": {
                 "tg.op1": ("operator", "tg.op1"),
-                "tg.op2": ("operator", "tg.op2"),
+                # "tg.op2": ("operator", "tg.op2"),
             },
             "downstream_group_ids": [],
             "downstream_task_ids": [],

Reply via email to