This is an automated email from the ASF dual-hosted git repository.

potiuk pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/airflow.git


The following commit(s) were added to refs/heads/main by this push:
     new 7722b6f226 Ensure dynamic tasks inside dynamic task group only marks 
the (#32354)
7722b6f226 is described below

commit 7722b6f226e9db3a89b01b89db5fdb7a1ab2256f
Author: Karthikeyan Singaravelan <[email protected]>
AuthorDate: Wed Jul 5 14:08:29 2023 +0530

    Ensure dynamic tasks inside dynamic task group only marks the (#32354)
    
    corresponding EmptyOperator in downstream as success.
---
 airflow/models/dagrun.py    |  4 ++--
 tests/models/test_dagrun.py | 46 +++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 48 insertions(+), 2 deletions(-)

diff --git a/airflow/models/dagrun.py b/airflow/models/dagrun.py
index 8f3b3a3301..b0b56f3c6c 100644
--- a/airflow/models/dagrun.py
+++ b/airflow/models/dagrun.py
@@ -1338,7 +1338,7 @@ class DagRun(Base, LoggingMixin):
                 and not ti.task.on_success_callback
                 and not ti.task.outlets
             ):
-                dummy_ti_ids.append(ti.task_id)
+                dummy_ti_ids.append((ti.task_id, ti.map_index))
             else:
                 schedulable_ti_ids.append((ti.task_id, ti.map_index))
 
@@ -1369,7 +1369,7 @@ class DagRun(Base, LoggingMixin):
                     .where(
                         TI.dag_id == self.dag_id,
                         TI.run_id == self.run_id,
-                        TI.task_id.in_(dummy_ti_ids_chunk),
+                        tuple_in_condition((TI.task_id, TI.map_index), 
dummy_ti_ids_chunk),
                     )
                     .values(
                         state=TaskInstanceState.SUCCESS,
diff --git a/tests/models/test_dagrun.py b/tests/models/test_dagrun.py
index 5d7db547d3..c90ec1aef3 100644
--- a/tests/models/test_dagrun.py
+++ b/tests/models/test_dagrun.py
@@ -1811,6 +1811,52 @@ def test_mapped_task_group_expands_at_create(dag_maker, 
session):
     ]
 
 
+def test_mapped_task_group_empty_operator(dag_maker, session):
+    """
+    Test that dynamic task inside a dynamic task group only marks
+    the corresponding downstream EmptyOperator as success.
+    """
+
+    literal = [1, 2, 3]
+
+    with dag_maker(session=session) as dag:
+
+        @task_group
+        def tg(x):
+            @task
+            def t1(x):
+                return x
+
+            t2 = EmptyOperator(task_id="t2")
+
+            @task
+            def t3(x):
+                return x
+
+            t1(x) >> t2 >> t3(x)
+
+        tg.expand(x=literal)
+
+    dr = dag_maker.create_dagrun()
+
+    t2_task = dag.get_task("tg.t2")
+    t2_0 = dr.get_task_instance(task_id="tg.t2", map_index=0)
+    t2_0.refresh_from_task(t2_task)
+    assert t2_0.state is None
+
+    t2_1 = dr.get_task_instance(task_id="tg.t2", map_index=1)
+    t2_1.refresh_from_task(t2_task)
+    assert t2_1.state is None
+
+    dr.schedule_tis([t2_0])
+
+    t2_0 = dr.get_task_instance(task_id="tg.t2", map_index=0)
+    assert t2_0.state == TaskInstanceState.SUCCESS
+
+    t2_1 = dr.get_task_instance(task_id="tg.t2", map_index=1)
+    assert t2_1.state is None
+
+
 def test_ti_scheduling_mapped_zero_length(dag_maker, session):
     with dag_maker(session=session):
         task = BaseOperator(task_id="task_1")

Reply via email to